diff --git a/cylc/flow/task_outputs.py b/cylc/flow/task_outputs.py index 6c0bd8f8ea2..644b7b0dd3c 100644 --- a/cylc/flow/task_outputs.py +++ b/cylc/flow/task_outputs.py @@ -299,3 +299,18 @@ def msg_sort_key(item): except ValueError: ind = 999 return (ind, item[_MESSAGE] or '') + + @staticmethod + def output_sort_key(item): + """Compare by output order. + + Examples: + + >>> this = TaskOutputs.output_sort_key + >>> sorted(['finished', 'started', 'custom'], key=this) + ['started', 'custom', 'finished'] + """ + if item in TASK_OUTPUTS: + return TASK_OUTPUTS.index(item) + # Sort custom outputs after started. + return TASK_OUTPUTS.index(TASK_OUTPUT_STARTED) + .5 diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index dbe60fb7292..d9c418e7137 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1820,6 +1820,7 @@ def _set_outputs_itask( itask.point, itask.tdef, outputs) changed = False + outputs = sorted(outputs, key=itask.state.outputs.output_sort_key) for output in outputs: if itask.state.outputs.is_completed(output): LOG.info(f"output {itask.identity}:{output} completed already") diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index be1f335d45b..e076e6a4495 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -1499,6 +1499,38 @@ async def test_set_outputs_live( ) +async def test_set_outputs_live2( + flow, + scheduler, + start, + log_filter, +): + """Assert that optional outputs are satisfied before completion + outputs to prevent incomplete task warnings. + """ + id_ = flow( + { + 'scheduler': {'allow implicit tasks': 'True'}, + 'scheduling': {'graph': { + 'R1': """ + foo:a => apple + foo:b => boat + """}}, + 'runtime': {'foo': {'outputs': { + 'a': 'xylophone', + 'b': 'yacht'}}} + } + ) + schd = scheduler(id_) + + async with start(schd) as log: + schd.pool.set_prereqs_and_outputs(["1/foo"], None, None, ['all']) + assert not log_filter( + log, + contains="did not complete required outputs: ['a', 'b']" + ) + + async def test_set_outputs_future( flow, scheduler,