Skip to content

Commit

Permalink
log xtrigger errors, workflow_state validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Apr 26, 2024
1 parent f28d07f commit 697681f
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 33 deletions.
3 changes: 3 additions & 0 deletions changes.d/5809.break.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The workflow-state command and xtrigger now looks for task outputs (a.k.a triggers)
not the corresponding task messages, as well as task status. This is a consequence
of the new optional output support in Cylc 8.
1 change: 1 addition & 0 deletions changes.d/5809.feat.d
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The workflow-state command and xtrigger is now flow-specific.
1 change: 1 addition & 0 deletions changes.d/5809.fix.d
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Workflow-state command and xtrigger: handle status and outputs the same way.
2 changes: 1 addition & 1 deletion cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def get_option_parser() -> COP:
parser.add_option(
"-O", "--output", "--message", metavar="OUTPUT",
help="Check for a given task output"
" (--message is deprecated and aliased to task output)",
" (--message is now aliased to task output)",
action="store", dest="output", default=None)

parser.add_option(
Expand Down
31 changes: 23 additions & 8 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ def check_xtrigger(
)
except TypeError as exc:
raise XtriggerConfigError(label, f"{sig_str}: {exc}")

# Specific xtrigger.validate(), if available.
XtriggerManager.try_xtrig_validate_func(
label, fname, fdir, bound_args, sig_str
Expand Down Expand Up @@ -406,6 +407,7 @@ def try_xtrig_validate_func(
"""Call an xtrigger's `validate()` function if it is implemented.
Raise XtriggerConfigError if validation fails.
"""
try:
xtrig_validate_func = get_xtrig_func(fname, 'validate', fdir)
Expand All @@ -416,7 +418,7 @@ def try_xtrig_validate_func(
xtrig_validate_func(bound_args.arguments)
except Exception as exc: # Note: catch all errors
raise XtriggerConfigError(
label, f"{signature_str} validation failed: {exc}"
label, f"{signature_str}\n{exc}"
)

def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
Expand Down Expand Up @@ -623,22 +625,35 @@ def callback(self, ctx: 'SubFuncContext'):
Record satisfaction status and function results dict.
Log a warning if the xtrigger functions errors, to distinguish
errors from not-satisfied.
Args:
ctx (SubFuncContext): function context
Raises:
ValueError: if the context given is not active
"""
sig = ctx.get_signature()
self.active.remove(sig)

if ctx.ret_code != 0:
msg = f"ERROR in xtrigger {sig}"
if ctx.err:
msg += f"\n{ctx.err}"

Check warning on line 642 in cylc/flow/xtrigger_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtrigger_mgr.py#L642

Added line #L642 was not covered by tests
LOG.warning(msg)

try:
satisfied, results = json.loads(ctx.out)
except (ValueError, TypeError):
return

LOG.debug('%s: returned %s', sig, results)
if satisfied:
# Newly satisfied
self.data_store_mgr.delta_task_xtrigger(sig, True)
self.workflow_db_mgr.put_xtriggers({sig: results})
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
self.sat_xtrig[sig] = results
self.do_housekeeping = True
if not satisfied:
return

Check warning on line 652 in cylc/flow/xtrigger_mgr.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtrigger_mgr.py#L652

Added line #L652 was not covered by tests

# Newly satisfied
self.data_store_mgr.delta_task_xtrigger(sig, True)
self.workflow_db_mgr.put_xtriggers({sig: results})
LOG.info('xtrigger satisfied: %s = %s', ctx.label, sig)
self.sat_xtrig[sig] = results
self.do_housekeeping = True
2 changes: 1 addition & 1 deletion cylc/flow/xtriggers/suite_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ def suite_state(suite, task, point, offset=None, status='succeeded',
point=point,
offset=offset,
status=status,
message=message,
output=message,
cylc_run_dir=cylc_run_dir
)
77 changes: 58 additions & 19 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import sqlite3
from typing import Dict, Optional, Tuple
from typing import Dict, Optional, Tuple, Any

from metomi.isodatetime.parsers import TimePointParser

from cylc.flow.cycling.util import add_offset
from cylc.flow.dbstatecheck import CylcWorkflowDBChecker
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.workflow_files import infer_latest_run_from_id
from cylc.flow.exceptions import WorkflowConfigError
from cylc.flow.task_state import TASK_STATUSES_ALL


def workflow_state(
Expand All @@ -31,9 +32,10 @@ def workflow_state(
point: str,
offset: Optional[str] = None,
status: Optional[str] = None,
message: Optional[str] = None,
output: Optional[str] = None,
flow_num: Optional[int] = None,
cylc_run_dir: Optional[str] = None
) -> Tuple[bool, Optional[Dict[str, Optional[str]]]]:
) -> Tuple[bool, Dict[str, Optional[str]]]:
"""Connect to a workflow DB and query the requested task state.
* Reports satisfied only if the remote workflow state has been achieved.
Expand Down Expand Up @@ -82,32 +84,69 @@ def workflow_state(
if offset is not None:
point = str(add_offset(point, offset))

try:
checker = CylcWorkflowDBChecker(cylc_run_dir, workflow)
except (OSError, sqlite3.Error):
# Failed to connect to DB; target workflow may not be started.
return (False, None)

point = str(
TimePointParser().parse(
point, dump_format=checker.point_fmt
# Failure to connect to DB will raise exceptions here.
# It could mean the target workflow as not started yet,
# but it could also mean a typo in the workflow ID, so
# so don't hide the error.
checker = CylcWorkflowDBChecker(cylc_run_dir, workflow)

# Point validity can only be checked at run time.
# Bad function arg templating can cause a syntax error.
if checker.point_fmt is None:
# Integer cycling: raises ValueError if bad.
int(point)

Check warning on line 97 in cylc/flow/xtriggers/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/workflow_state.py#L97

Added line #L97 was not covered by tests
else:
# Datetime cycling: raises ISO8601SyntaxError if bad
point = str(
TimePointParser().parse(
point, dump_format=checker.point_fmt
)
)
)

if not message and not status:
if not output and not status:
status = "succeeded"

satisfied = checker.task_state_met(
task, str(point), output=message, status=status
satisfied: bool = checker.task_state_met(
task, point, output=output, status=status
)

results = {
'workflow': workflow,
'task': task,
'point': point,
'point': str(point),
'offset': offset,
'status': status,
'message': message,
'output': output,
'flow_num': str(flow_num),
'cylc_run_dir': cylc_run_dir
}
return satisfied, results


def validate(args: Dict[str, Any]):
"""Validate workflow_state function args from the workflow config.
The rules for are:
* output/status: one at most (defaults to succeeded status)
* flow_num: Must be an integer
* status: Must be a valid status
"""
output = args['output']
status = args['status']
flow_num = args['flow_num']

Check warning on line 137 in cylc/flow/xtriggers/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/workflow_state.py#L135-L137

Added lines #L135 - L137 were not covered by tests

if output is not None and status is not None:
raise WorkflowConfigError(

Check warning on line 140 in cylc/flow/xtriggers/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/workflow_state.py#L140

Added line #L140 was not covered by tests
"Give `status` or `output`, not both"
)

if status is not None and status not in TASK_STATUSES_ALL:
raise WorkflowConfigError(

Check warning on line 145 in cylc/flow/xtriggers/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/workflow_state.py#L145

Added line #L145 was not covered by tests
f"Invalid tasks status '{status}'"
)

if flow_num is not None and not isinstance(flow_num, int):
raise WorkflowConfigError(

Check warning on line 150 in cylc/flow/xtriggers/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/workflow_state.py#L150

Added line #L150 was not covered by tests
"flow_num must be an integer"
)
1 change: 0 additions & 1 deletion tests/functional/xtriggers/03-sequence.t
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,3 @@ __END__

cylc stop --now --max-polls=10 --interval=2 "${WORKFLOW_NAME}"
purge
exit
2 changes: 1 addition & 1 deletion tests/functional/xtriggers/04-suite_state.t
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ init_workflow "$TEST_NAME_BASE" << __FLOW_CONFIG__
[[[R1]]]
graph = @upstream => foo
[[xtriggers]]
upstream = suite_state(suite=thorin/oin/gloin, task=mithril, point=1)
upstream = suite_state(suite=thorin/oin/gloin, task=mithril, point=1, status="succeeded")
[runtime]
[[foo]]
__FLOW_CONFIG__
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def test_xtrig_validation_wall_clock(
}
})
with pytest.raises(WorkflowConfigError, match=(
r'\[@myxt\] wall_clock\(offset=PT7MH\) validation failed: '
r'\[@myxt\] wall_clock\(offset=PT7MH\)\n'
r'Invalid offset: PT7MH'
)):
validate(id_)
Expand Down Expand Up @@ -392,7 +392,7 @@ def test_xtrig_validation_echo(
})
with pytest.raises(
WorkflowConfigError,
match=r'echo.* Requires \'succeed=True/False\' arg'
match=r'Requires \'succeed=True/False\' arg'
):
validate(id_)

Expand Down

0 comments on commit 697681f

Please sign in to comment.