From 237e39992eea411dde94a29099f69f7d247855d4 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 18 Jan 2024 15:35:30 +0000 Subject: [PATCH] set: add some tests * Add test for use case 5 where a custom output is set without also setting a final output. * Add some integration tests to cover interaction with other features. --- tests/functional/cylc-set/00-set-succeeded.t | 4 +- tests/functional/cylc-set/01-off-flow-pre.t | 18 +-- tests/functional/cylc-set/02-off-flow-out.t | 22 +-- tests/functional/cylc-set/03-set-failed.t | 6 +- tests/functional/cylc-set/04-switch.t | 4 +- tests/functional/cylc-set/05-expire.t | 4 +- tests/functional/cylc-set/06-parentless.t | 6 +- tests/functional/cylc-set/08-switch2.t | 4 +- .../functional/cylc-set/08-switch2/flow.cylc | 9 +- .../cylc-set/08-switch2/reference.log | 11 +- tests/integration/conftest.py | 31 ++-- tests/integration/scripts/test_set.py | 151 ++++++++++++++++++ 12 files changed, 222 insertions(+), 48 deletions(-) create mode 100644 tests/integration/scripts/test_set.py diff --git a/tests/functional/cylc-set/00-set-succeeded.t b/tests/functional/cylc-set/00-set-succeeded.t index 009b5c16ff3..f76eeb93d11 100644 --- a/tests/functional/cylc-set/00-set-succeeded.t +++ b/tests/functional/cylc-set/00-set-succeeded.t @@ -16,8 +16,8 @@ # along with this program. If not, see . #------------------------------------------------------------------------------- -# "cylc set" proposal examples. -# Set incomplete failed tasks to succeeded. +# "cylc set" proposal examples: 1 - Carry on as if a failed task had succeeded +# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#1-carry-on-as-if-a-failed-task-had-succeeded . "$(dirname "$0")/test_header" set_test_number 6 diff --git a/tests/functional/cylc-set/01-off-flow-pre.t b/tests/functional/cylc-set/01-off-flow-pre.t index 56eb6d72dd0..818feedd445 100644 --- a/tests/functional/cylc-set/01-off-flow-pre.t +++ b/tests/functional/cylc-set/01-off-flow-pre.t @@ -15,9 +15,9 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- - -# "cylc set" proposal examples. -# Set off-flow prerequisites to prevent a new flow from stalling. +# +# "cylc set" proposal examples: 2 - Set off-flow prerequisites to prevent a new flow from stalling. +# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#2-set-off-flow-prerequisites-to-prep-for-a-new-flow . "$(dirname "$0")/test_header" set_test_number 8 @@ -25,13 +25,13 @@ set_test_number 8 install_and_validate reftest_run -grep_workflow_log_ok ab "1/a does not depend on 1/b_cold:succeeded" -grep_workflow_log_ok ac "1/a does not depend on 1/c_cold:succeeded" +grep_workflow_log_ok "${TEST_NAME_BASE}-ab" "1/a does not depend on 1/b_cold:succeeded" +grep_workflow_log_ok "${TEST_NAME_BASE}-ac" "1/a does not depend on 1/c_cold:succeeded" -grep_workflow_log_ok ba "1/b does not depend on 1/a_cold:succeeded" -grep_workflow_log_ok bc "1/b does not depend on 1/c_cold:succeeded" +grep_workflow_log_ok "${TEST_NAME_BASE}-ba" "1/b does not depend on 1/a_cold:succeeded" +grep_workflow_log_ok "${TEST_NAME_BASE}-bc" "1/b does not depend on 1/c_cold:succeeded" -grep_workflow_log_ok ca "1/c does not depend on 1/a_cold:succeeded" -grep_workflow_log_ok cb "1/c does not depend on 1/b_cold:succeeded" +grep_workflow_log_ok "${TEST_NAME_BASE}-ca" "1/c does not depend on 1/a_cold:succeeded" +grep_workflow_log_ok "${TEST_NAME_BASE}-cb" "1/c does not depend on 1/b_cold:succeeded" purge diff --git a/tests/functional/cylc-set/02-off-flow-out.t b/tests/functional/cylc-set/02-off-flow-out.t index 9f8cee9c029..db3364b65c2 100644 --- a/tests/functional/cylc-set/02-off-flow-out.t +++ b/tests/functional/cylc-set/02-off-flow-out.t @@ -16,8 +16,8 @@ # along with this program. If not, see . #------------------------------------------------------------------------------- -# "cylc set" proposal examples. -# Set off-flow outputs to prevent a new flow from stalling. +# "cylc set" proposal examples: 2 - Set off-flow outputs to prevent a new flow from stalling. +# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#2-set-off-flow-prerequisites-to-prep-for-a-new-flow . "$(dirname "$0")/test_header" set_test_number 11 @@ -29,16 +29,16 @@ reftest_run # - all the required outputs of a_cold # - the requested and implied outputs of b_cold and c_cold -grep_workflow_log_ok grep-a1 '1/a_cold.* setting missed output: submitted' -grep_workflow_log_ok grep-a2 '1/a_cold.* setting missed output: started' -grep_workflow_log_ok grep-a3 'output 1/a_cold:succeeded completed' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/a_cold.* setting missed output: submitted' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/a_cold.* setting missed output: started' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a3" 'output 1/a_cold:succeeded completed' -grep_workflow_log_ok grep-a1 '1/b_cold.* setting missed output: submitted' -grep_workflow_log_ok grep-a2 '1/b_cold.* setting missed output: started' -grep_workflow_log_ok grep-b3 'output 1/b_cold:succeeded completed' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/b_cold.* setting missed output: submitted' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/b_cold.* setting missed output: started' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-b3" 'output 1/b_cold:succeeded completed' -grep_workflow_log_ok grep-a1 '1/c_cold.* setting missed output: submitted' -grep_workflow_log_ok grep-a2 '1/c_cold.* setting missed output: started' -grep_workflow_log_ok grep-c3 'output 1/c_cold:succeeded completed' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a1" '1/c_cold.* setting missed output: submitted' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-a2" '1/c_cold.* setting missed output: started' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-c3" 'output 1/c_cold:succeeded completed' purge diff --git a/tests/functional/cylc-set/03-set-failed.t b/tests/functional/cylc-set/03-set-failed.t index c3592afc0f3..07ba070aa02 100644 --- a/tests/functional/cylc-set/03-set-failed.t +++ b/tests/functional/cylc-set/03-set-failed.t @@ -16,8 +16,8 @@ # along with this program. If not, see . #------------------------------------------------------------------------------- -# "cylc set" proposal examples. -# check that we can set a dead orphaned job to failed. +# "cylc set" proposal examples: 4 -check that we can set a dead orphaned job to failed. +# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#4-set-jobs-to-failed-when-a-job-platform-is-known-to-be-down . "$(dirname "$0")/test_header" set_test_number 4 @@ -39,7 +39,7 @@ cylc stop --now --now --interval=2 --max-polls=5 "${WORKFLOW_NAME}" # - set completion message # - implied outputs reported as already completed -grep_workflow_log_ok grep-3 'set: output 1/foo:failed completed' +grep_workflow_log_ok "${TEST_NAME_BASE}-grep-3" 'set: output 1/foo:failed completed' # Check the DB records all the outputs. sqlite3 ~/cylc-run/"${WORKFLOW_NAME}"/log/db \ diff --git a/tests/functional/cylc-set/04-switch.t b/tests/functional/cylc-set/04-switch.t index 2cfe1103b84..4a48d0f5d01 100644 --- a/tests/functional/cylc-set/04-switch.t +++ b/tests/functional/cylc-set/04-switch.t @@ -16,8 +16,8 @@ # along with this program. If not, see . #------------------------------------------------------------------------------- -# "cylc set" proposal examples. -# check that we can direct future optional branching a certain way +# "cylc set" proposal examples: 5 - Set and complete a future switch task with the "--wait" flag +# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#5-set-switch-tasks-at-an-optional-branch-point-to-direct-the-future-flow . "$(dirname "$0")/test_header" set_test_number 5 diff --git a/tests/functional/cylc-set/05-expire.t b/tests/functional/cylc-set/05-expire.t index f0e5b5869b9..dae60be234e 100644 --- a/tests/functional/cylc-set/05-expire.t +++ b/tests/functional/cylc-set/05-expire.t @@ -16,8 +16,8 @@ # along with this program. If not, see . #------------------------------------------------------------------------------- -# "cylc set" proposal examples. -# check that forced task expiry works +# "cylc set" proposal examples: 6 - check that forced task expiry works +# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#6-expire-a-task . "$(dirname "$0")/test_header" set_test_number 4 diff --git a/tests/functional/cylc-set/06-parentless.t b/tests/functional/cylc-set/06-parentless.t index 7b05b43063b..9ccf16e30e0 100644 --- a/tests/functional/cylc-set/06-parentless.t +++ b/tests/functional/cylc-set/06-parentless.t @@ -16,8 +16,8 @@ # along with this program. If not, see . #------------------------------------------------------------------------------- -# "cylc set" proposal examples. -# Check spawning a parentless task without ignoring xtriggers. +# "cylc set" proposal examples: 7 - Check spawning a parentless task without ignoring xtriggers. +# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#7-spawning-parentless-tasks . "$(dirname "$0")/test_header" set_test_number 3 @@ -25,6 +25,6 @@ set_test_number 3 install_and_validate REFTEST_OPTS="--start-task=1800/a" reftest_run -grep_workflow_log_ok clock "xtrigger satisfied: wall_clock" +grep_workflow_log_ok "${TEST_NAME_BASE}-clock" "xtrigger satisfied: wall_clock" purge diff --git a/tests/functional/cylc-set/08-switch2.t b/tests/functional/cylc-set/08-switch2.t index 06a106ed532..3f947668501 100644 --- a/tests/functional/cylc-set/08-switch2.t +++ b/tests/functional/cylc-set/08-switch2.t @@ -16,8 +16,8 @@ # along with this program. If not, see . #------------------------------------------------------------------------------- -# "cylc set" proposal examples. -# Set and complete a future switch task that is in the pool but runahead limite. +# "cylc set" proposal examples: 5 - Set and complete a future switch task. +# https://cylc.github.io/cylc-admin/proposal-cylc-set.html#5-set-switch-tasks-at-an-optional-branch-point-to-direct-the-future-flow . "$(dirname "$0")/test_header" set_test_number 2 diff --git a/tests/functional/cylc-set/08-switch2/flow.cylc b/tests/functional/cylc-set/08-switch2/flow.cylc index 44edb9a0c82..0a221f1227e 100644 --- a/tests/functional/cylc-set/08-switch2/flow.cylc +++ b/tests/functional/cylc-set/08-switch2/flow.cylc @@ -9,7 +9,7 @@ [scheduling] initial cycle point = 1 - final cycle point = 3 + final cycle point = 4 cycling mode = integer runahead limit = P0 [[graph]] @@ -30,7 +30,12 @@ [[z]] script = """ if (( CYLC_TASK_CYCLE_POINT == 1 )); then - # set future y-path in point 2 + # mark 2/a as succeeded with output y + # (task will be skipped) cylc set "${CYLC_WORKFLOW_ID}//2/a" --out=y,succeeded + elif (( CYLC_TASK_CYCLE_POINT == 2 )); then + # mark 2/a as having generated output y + # (task will re-run and generate output x in the prociess) + cylc set "${CYLC_WORKFLOW_ID}//3/a" --out=y fi """ diff --git a/tests/functional/cylc-set/08-switch2/reference.log b/tests/functional/cylc-set/08-switch2/reference.log index dab80d93838..a41fff43a9e 100644 --- a/tests/functional/cylc-set/08-switch2/reference.log +++ b/tests/functional/cylc-set/08-switch2/reference.log @@ -1,8 +1,17 @@ +# 1/a runs naturally and generates the output "x" 1/a -triggered off [] in flow 1 1/x -triggered off ['1/a'] in flow 1 1/z -triggered off ['1/x'] in flow 1 +# 1/a is artificially completed with the output "y" 2/y -triggered off ['2/a'] in flow 1 2/z -triggered off ['2/y'] in flow 1 +# 1/a has the output "y" is artificially set but is not completed +# (so 1/a will re-run and generate the output "x" naturally) 3/a -triggered off [] in flow 1 3/x -triggered off ['3/a'] in flow 1 -3/z -triggered off ['3/x'] in flow 1 +3/y -triggered off ['3/a'] in flow 1 +3/z -triggered off ['3/y'] in flow 1 +# 1/a runs naturally and generates the output "x" +4/a -triggered off [] in flow 1 +4/x -triggered off ['4/a'] in flow 1 +4/z -triggered off ['4/x'] in flow 1 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index ca7ee981bd9..1af3e2af312 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -24,6 +24,7 @@ from typing import List, TYPE_CHECKING, Set, Tuple, Union from cylc.flow.config import WorkflowConfig +from cylc.flow.id import Tokens from cylc.flow.option_parsers import Options from cylc.flow.pathutil import get_cylc_run_dir from cylc.flow.rundb import CylcWorkflowDAO @@ -544,7 +545,9 @@ def complete(): The scheduler to await. tokens_list: If specified, this will wait for the tasks represented by these - tokens to be marked as completed by the task pool. + tokens to be marked as completed by the task pool. Can use + relative task ids as strings (e.g. '1/a') rather than tokens for + convenience. stop_mode: If tokens_list is not provided, this will wait for the scheduler to be shutdown with the specified mode (default = AUTO, i.e. @@ -561,20 +564,26 @@ def complete(): """ async def _complete( schd, - *tokens_list, + *tokens_list: Union[Tokens, str], stop_mode=StopMode.AUTO, - timeout=60, - ): + timeout: int = 60, + ) -> None: start_time = time() - tokens_list = [tokens.task for tokens in tokens_list] + + _tokens_list: List[Tokens] = [] + for tokens in tokens_list: + if isinstance(tokens, str): + tokens = Tokens(tokens, relative=True) + _tokens_list.append(tokens.task) # capture task completion remove_if_complete = schd.pool.remove_if_complete def _remove_if_complete(itask): + nonlocal _tokens_list ret = remove_if_complete(itask) - if ret and itask.tokens.task in tokens_list: - tokens_list.remove(itask.tokens.task) + if ret and itask.tokens.task in _tokens_list: + _tokens_list.remove(itask.tokens.task) return ret schd.pool.remove_if_complete = _remove_if_complete @@ -595,8 +604,8 @@ def _set_stop(mode=None): schd._set_stop = _set_stop # determine the completion condition - if tokens_list: - condition = lambda: bool(tokens_list) + if _tokens_list: + condition = lambda: bool(_tokens_list) else: condition = lambda: bool(not has_shutdown) @@ -604,9 +613,9 @@ def _set_stop(mode=None): while condition(): # allow the main loop to advance await asyncio.sleep(0) - if time() - start_time > timeout: + if (time() - start_time) > timeout: raise Exception( - f'Timeout waiting for {", ".join(map(str, tokens_list))}' + f'Timeout waiting for {", ".join(map(str, _tokens_list))}' ) # restore regular shutdown logic diff --git a/tests/integration/scripts/test_set.py b/tests/integration/scripts/test_set.py new file mode 100644 index 00000000000..7dbb240533a --- /dev/null +++ b/tests/integration/scripts/test_set.py @@ -0,0 +1,151 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test "cylc set" functionality. + +Note: see also functional tests +""" + +from cylc.flow.cycling.integer import IntegerPoint +from cylc.flow.data_store_mgr import TASK_PROXIES +from cylc.flow.task_state import TASK_STATUS_WAITING, TASK_STATUS_SUCCEEDED + + +async def test_set_parentless_spawning( + flow, + scheduler, + run, + complete, +): + """Ensure that setting outputs does not interfere with parentless spawning. + + Setting outputs manually causes the logic to follow a different code + pathway to "natural" output satisfaction. If we're not careful this could + lead to "premature shutdown" (i.e. the scheduler thinks it's finished when + it isn't), this test makes sure that's not the case. + """ + id_ = flow({ + 'scheduling': { + 'initial cycle point': '1', + 'cycling mode': 'integer', + 'runahead limit': 'P0', + 'graph': {'P1': 'a => z'}, + }, + }) + schd = scheduler(id_, paused_start=False) + async with run(schd): + # mark cycle 1 as succeeded + schd.pool.set(['1/a', '1/z'], ['succeeded'], None, ['1']) + + # the parentless task "a" should be spawned out to the runahead limit + assert [ + itask.identity for itask in schd.pool.get_tasks() + ] == ['2/a', '3/a'] + + # the workflow should run on to the next cycle + await complete(schd, '2/a', timeout=5) + + +async def test_rerun_incomplete( + flow, + scheduler, + run, + complete, + reflog, +): + """Incomplete tasks should be re-run.""" + id_ = flow({ + 'scheduling': { + 'graph': {'R1': 'a => z'}, + }, + 'runtime': { + # register a custom output + 'a': {'outputs': {'x': 'x'}}, + }, + }) + schd = scheduler(id_, paused_start=False) + async with run(schd): + # generate 1/a:x but do not complete 1/a + schd.pool.set(['1/a'], ['x'], None, ['1']) + triggers = reflog(schd) + await complete(schd) + + assert triggers == { + # the task 1/a should have been run despite the earlier + # setting of the "x" output + ('1/a', None), + ('1/z', ('1/a',)), + } + + +async def test_data_store( + flow, + scheduler, + start, +): + """Test that manually set prereqs/outputs are applied to the data store.""" + id_ = flow({ + 'scheduling': { + 'graph': {'R1': 'a => z'}, + }, + 'runtime': { + # register a custom output + 'a': {'outputs': {'x': 'x'}}, + }, + }) + schd = scheduler(id_) + async with start(schd): + await schd.update_data_structure() + data = schd.data_store_mgr.data[schd.tokens.id] + task_a = data[TASK_PROXIES][ + schd.pool.get_task(IntegerPoint('1'), 'a').tokens.id + ] + + # set the 1/a:succeeded prereq of 1/z + schd.pool.set(['1/z'], None, ['1/a:succeeded'], ['1']) + task_z = data[TASK_PROXIES][ + schd.pool.get_task(IntegerPoint('1'), 'z').tokens.id + ] + await schd.update_data_structure() + assert task_z.prerequisites[0].satisfied is True + + # set 1/a:x the task should be waiting with output x satisfied + schd.pool.set(['1/a'], ['x'], None, ['1']) + await schd.update_data_structure() + assert task_a.state == TASK_STATUS_WAITING + assert task_a.outputs['x'].satisfied is True + assert task_a.outputs['succeeded'].satisfied is False + + # set 1/a:succeeded the task should be succeeded with output x sat + schd.pool.set(['1/a'], ['succeeded'], None, ['1']) + await schd.update_data_structure() + assert task_a.state == TASK_STATUS_SUCCEEDED + assert task_a.outputs['x'].satisfied is True + assert task_a.outputs['succeeded'].satisfied is True + + +async def test_incomplete_detection( + one_conf, + flow, + scheduler, + start, + log_filter, +): + """It should detect and log finished tasks left with incomplete outputs.""" + schd = scheduler(flow(one_conf)) + async with start(schd) as log: + schd.pool.set(['1/one'], ['failed'], None, ['1']) + assert log_filter(log, contains='1/one did not complete')