diff --git a/changes.d/fix.6109.md b/changes.d/fix.6109.md new file mode 100644 index 00000000000..36f22c3d4fc --- /dev/null +++ b/changes.d/fix.6109.md @@ -0,0 +1 @@ +Fixed bug affecting job submission where the list of bad hosts was not always reset correctly. \ No newline at end of file diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py index 9881631484b..7235455cace 100644 --- a/cylc/flow/exceptions.py +++ b/cylc/flow/exceptions.py @@ -21,6 +21,7 @@ Dict, Optional, Sequence, + Set, Union, TYPE_CHECKING, ) @@ -455,13 +456,17 @@ class NoPlatformsError(PlatformLookupError): target. place: Where the attempt to get the platform failed. - """ def __init__( - self, identity: str, set_type: str = 'group', place: str = '' + self, + identity: str, + hosts_consumed: Set[str], + set_type: str = 'group', + place: str = '', ): self.identity = identity self.type = set_type + self.hosts_consumed = hosts_consumed if place: self.place = f' during {place}.' else: diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py index 72ccebffe52..d06c84ade92 100644 --- a/cylc/flow/platforms.py +++ b/cylc/flow/platforms.py @@ -302,9 +302,14 @@ def get_platform_from_group( else: platform_names = group['platforms'] - # Return False if there are no platforms available to be selected. + # If there are no platforms available to be selected: if not platform_names: - raise NoPlatformsError(group_name) + hosts_consumed = { + host + for platform in group['platforms'] + for host in platform_from_name(platform)['hosts']} + raise NoPlatformsError( + group_name, hosts_consumed) # Get the selection method method = group['selection']['method'] diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 69b8a22bb97..4ac90d28aeb 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -274,6 +274,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, if not prepared_tasks: return bad_tasks + auth_itasks = {} # {platform: [itask, ...], ...} for itask in prepared_tasks: @@ -281,6 +282,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth, auth_itasks.setdefault(platform_name, []) auth_itasks[platform_name].append(itask) # Submit task jobs for each platform + # Non-prepared tasks can be considered done for now: done_tasks = bad_tasks for _, itasks in sorted(auth_itasks.items()): @@ -1107,7 +1109,7 @@ def _prep_submit_task_job( Returns: * itask - preparation complete. * None - preparation in progress. - * False - perparation failed. + * False - preparation failed. """ if itask.local_job_file_path: @@ -1201,6 +1203,14 @@ def _prep_submit_task_job( itask.summary['platforms_used'][itask.submit_num] = '' # Retry delays, needed for the try_num self._create_job_log_path(workflow, itask) + if isinstance(exc, NoPlatformsError): + # Clear all hosts from all platforms in group from + # bad_hosts: + self.bad_hosts -= exc.hosts_consumed + self._set_retry_timers(itask, rtconfig) + self._prep_submit_task_job_error( + workflow, itask, '(no platforms available)', exc) + return False self._prep_submit_task_job_error( workflow, itask, '(platform not defined)', exc) return False diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py index 6c2649ca0e4..50b523fb50c 100644 --- a/cylc/flow/task_remote_mgr.py +++ b/cylc/flow/task_remote_mgr.py @@ -395,7 +395,10 @@ def remote_tidy(self) -> None: else: LOG.error( NoPlatformsError( - install_target, 'install target', 'remote tidy')) + install_target, + set(), + 'install target', + 'remote tidy')) # Wait for commands to complete for a max of 10 seconds timeout = time() + 10.0 while queue and time() < timeout: diff --git a/tests/integration/test_platforms.py b/tests/integration/test_platforms.py new file mode 100644 index 00000000000..f8187227ceb --- /dev/null +++ b/tests/integration/test_platforms.py @@ -0,0 +1,53 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +"""Integration testing for platforms functionality.""" + + +async def test_prep_submit_task_tries_multiple_platforms( + flow, scheduler, start, mock_glbl_cfg +): + """Preparation tries multiple platforms within a group if the + task platform setting matches a group, and that after all platforms + have been tried that the hosts matching that platform group are + cleared. + + See https://github.com/cylc/cylc-flow/pull/6109 + """ + global_conf = ''' + [platforms] + [[myplatform]] + hosts = broken + [[anotherbad]] + hosts = broken2 + [platform groups] + [[mygroup]] + platforms = myplatform, anotherbad''' + mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_conf) + + wid = flow({ + "scheduling": {"graph": {"R1": "foo"}}, + "runtime": {"foo": {"platform": "mygroup"}} + }) + schd = scheduler(wid, run_mode='live') + async with start(schd): + itask = schd.pool.get_tasks()[0] + itask.submit_num = 1 + # simulate failed attempts to contact the job hosts + schd.task_job_mgr.bad_hosts = {'broken', 'broken2'} + res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask) + assert res is False + # ensure the bad hosts have been cleared + assert not schd.task_job_mgr.bad_hosts