diff --git a/cylc/flow/prerequisite.py b/cylc/flow/prerequisite.py index b4abfe737dc..f6e580dfe26 100644 --- a/cylc/flow/prerequisite.py +++ b/cylc/flow/prerequisite.py @@ -202,7 +202,7 @@ def _conditional_is_satisfied(self): '"%s":\n%s' % (self.get_raw_conditional_expression(), err_msg)) return res - def satisfy_me(self, outputs: Iterable['Tokens']) -> Set[str]: + def satisfy_me(self, outputs: Iterable['Tokens']) -> 'Set[Tokens]': """Attempt to satisfy me with given outputs. Updates cache with the result. @@ -214,7 +214,7 @@ def satisfy_me(self, outputs: Iterable['Tokens']) -> Set[str]: prereq = output.to_prereq_tuple() if prereq not in self.satisfied: continue - valid.add(output.relative_id_with_selectors) + valid.add(output) self.satisfied[prereq] = self.DEP_STATE_SATISFIED if self.conditional_expression is None: self._all_satisfied = all(self.satisfied.values()) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index f0851a9f9d9..59f5844fca5 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1299,7 +1299,7 @@ def spawn_on_output(self, itask, output, forced=False): else: tasks = [c_task] for t in tasks: - t.satisfy_me([f"{itask.identity}:{output}"]) + t.satisfy_me([itask.tokens.duplicate(task_sel=output)]) self.data_store_mgr.delta_task_prerequisite(t) self.add_to_pool(t) @@ -1423,7 +1423,9 @@ def spawn_on_all_outputs( # not spawnable continue if completed_only: - c_task.satisfy_me([f"{itask.identity}:{output}"]) + c_task.satisfy_me( + [itask.tokens.duplicate(task_sel=output)] + ) self.data_store_mgr.delta_task_prerequisite(c_task) self.add_to_pool(c_task) if ( @@ -1604,9 +1606,10 @@ def spawn_task( and itask.tdef.has_abs_triggers and itask.state.prerequisites_are_not_all_satisfied() ): - itask.satisfy_me( - [f"{a[0]}/{a[1]}:{a[2]}" for a in self.abs_outputs_done] - ) + itask.satisfy_me([ + Tokens(cycle=cycle, task=task, task_sel=output) + for cycle, task, output in self.abs_outputs_done + ]) if prev_flow_wait: self._spawn_after_flow_wait(itask) @@ -1716,6 +1719,11 @@ def set( # noqa: A003 # Illegal flow command opts return + _prereqs: List[Tokens] = [ + Tokens(prereq, relative=True) + for prereq in (prereqs or []) + ] + # Get matching pool tasks and future task definitions. itasks, future_tasks, unmatched = self.filter_task_proxies( items, @@ -1725,17 +1733,17 @@ def set( # noqa: A003 for itask in itasks: self.merge_flows(itask, flow_nums) - if prereqs: + if _prereqs: self._set_prereqs_itask( - itask, prereqs, flow_nums, flow_wait) + itask, _prereqs, flow_nums, flow_wait) else: self._set_outputs_itask(itask, outputs) for name, point in future_tasks: tdef = self.config.get_taskdef(name) - if prereqs: + if _prereqs: self._set_prereqs_tdef( - point, tdef, prereqs, flow_nums, flow_wait) + point, tdef, _prereqs, flow_nums, flow_wait) else: trans = self._get_task_proxy( point, tdef, flow_nums, flow_wait, transient=True) @@ -1778,7 +1786,7 @@ def _set_outputs_itask( def _set_prereqs_itask( self, itask: 'TaskProxy', - prereqs: List[str], + prereqs: List[Tokens], flow_nums: Set[int], flow_wait: bool ) -> None: diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 8fe2aa74b40..c58456f212c 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -37,7 +37,6 @@ from cylc.flow import LOG from cylc.flow.flow_mgr import stringify_flow_nums -from cylc.flow.id import Tokens from cylc.flow.platforms import get_platform from cylc.flow.task_action_timer import TimerFlags from cylc.flow.task_state import ( @@ -57,6 +56,7 @@ from cylc.flow.cycling import PointBase from cylc.flow.task_action_timer import TaskActionTimer from cylc.flow.taskdef import TaskDef + from cylc.flow.id import Tokens class TaskProxy: @@ -518,18 +518,18 @@ def state_reset( return True return False - def satisfy_me(self, outputs: Iterable[str]) -> None: + def satisfy_me(self, outputs: 'Iterable[Tokens]') -> None: """Try to satisfy my prerequisites with given outputs. The output strings are of the form "cycle/task:message" Log a warning for outputs that I don't depend on. """ - tokens = [Tokens(p, relative=True) for p in outputs] - used = self.state.satisfy_me(tokens) + used = self.state.satisfy_me(outputs) for output in set(outputs) - used: LOG.warning( - f"{self.identity} does not depend on {output}" + f"{self.identity} does not depend on" + f" {output.relative_id_with_selectors}" ) def clock_expire(self) -> bool: diff --git a/cylc/flow/task_state.py b/cylc/flow/task_state.py index 1915dac4062..7844e491911 100644 --- a/cylc/flow/task_state.py +++ b/cylc/flow/task_state.py @@ -313,12 +313,12 @@ def __call__( def satisfy_me( self, outputs: Iterable['Tokens'] - ) -> Set[str]: + ) -> Set['Tokens']: """Try to satisfy my prerequisites with given outputs. Return which outputs I actually depend on. """ - valid: Set[str] = set() + valid: Set[Tokens] = set() for prereq in (*self.prerequisites, *self.suicide_prerequisites): yep = prereq.satisfy_me(outputs) if yep: diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 7a0071b61c0..3d8232a90d5 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -32,8 +32,9 @@ from cylc.flow import CYLC_LOG from cylc.flow.cycling.integer import IntegerPoint from cylc.flow.cycling.iso8601 import ISO8601Point -from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.data_store_mgr import TASK_PROXIES +from cylc.flow.id import Tokens +from cylc.flow.task_events_mgr import TaskEventsManager from cylc.flow.task_outputs import ( TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED @@ -1523,7 +1524,10 @@ async def test_prereq_satisfaction( assert not b.is_waiting_prereqs_done() # set valid and invalid prerequisites, check log. - b.satisfy_me(["1/a:x", "1/a:y", "1/a:z", "1/a:w"]) + b.satisfy_me([ + Tokens(id_, relative=True) + for id_ in ["1/a:x", "1/a:y", "1/a:z", "1/a:w"] + ]) assert log_filter(log, contains="1/b does not depend on 1/a:z") assert log_filter(log, contains="1/b does not depend on 1/a:w") assert not log_filter(log, contains="1/b does not depend on 1/a:x")