From ab469a6ac2279f841aeb0f15ac67e9ec037d3f1f Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Thu, 2 Nov 2023 10:03:40 +1300 Subject: [PATCH] Fix workflow-state command and xtrigger. --- cylc/flow/dbstatecheck.py | 103 ++++++++++++++++------ cylc/flow/scripts/workflow_state.py | 122 +++++++++++++++----------- cylc/flow/xtriggers/workflow_state.py | 23 ++--- 3 files changed, 153 insertions(+), 95 deletions(-) diff --git a/cylc/flow/dbstatecheck.py b/cylc/flow/dbstatecheck.py index ca45b5deba6..15e61630dbd 100644 --- a/cylc/flow/dbstatecheck.py +++ b/cylc/flow/dbstatecheck.py @@ -67,6 +67,18 @@ def __init__(self, rund, workflow, db_path=None): raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), db_path) self.conn = sqlite3.connect(db_path, timeout=10.0) + # Get workflow point format. + try: + self.point_fmt = self._get_pt_fmt() + self.back_compat_mode = False + except sqlite3.OperationalError as exc: + # BACK COMPAT: Cylc 7 DB (see method below). + try: + self.point_fmt = self._get_pt_fmt_compat() + self.back_compat_mode = True + except sqlite3.OperationalError: + raise exc # original error + @staticmethod def display_maps(res): if not res: @@ -75,7 +87,7 @@ def display_maps(res): for row in res: sys.stdout.write((", ").join(row) + "\n") - def get_remote_point_format(self): + def _get_pt_fmt(self): """Query a workflow database for a 'cycle point format' entry""" for row in self.conn.execute( rf''' @@ -90,11 +102,16 @@ def get_remote_point_format(self): ): return row[0] - def get_remote_point_format_compat(self): - """Query a Cylc 7 suite database for a 'cycle point format' entry. - - Back compat for Cylc 8 workflow state triggers targeting Cylc 7 DBs. - """ + def _get_pt_fmt_compat(self): + """Query a Cylc 7 suite database for 'cycle point format'.""" + # BACK COMPAT: Cylc 7 DB + # Workflows parameters table name change. + # from: + # 8.0.x + # to: + # 8.1.x + # remove at: + # 8.x for row in self.conn.execute( rf''' SELECT @@ -108,6 +125,10 @@ def get_remote_point_format_compat(self): ): return row[0] + def get_point_format(self): + """Return the cycle point format of this DB.""" + return self.point_fmt + def state_lookup(self, state): """allows for multiple states to be searched via a status alias""" if state in self.STATE_ALIASES: @@ -116,19 +137,33 @@ def state_lookup(self, state): return [state] def workflow_state_query( - self, task, cycle, status=None, message=None, mask=None): - """run a query on the workflow database""" + self, task, cycle, status=None, message=None): + """Query task status or outputs in workflow database. + + Returns a list of tasks with matching status or output message. + + NOTE: the task_states table holds the latest state only, so querying + (e.g.) submitted will fail for a task that is running or finished. + + Query cycle=2023, status=succeeded: + [[foo, 2023, succeeded], [bar, 2023, succeeded]] + + Query task=foo, message="file ready": + [[foo, 2023, "file ready"], [foo, 2024, "file ready"]] + + Query task=foo, point=2023, message="file ready": + [[foo, 2023, "file ready"]] + + """ stmt_args = [] stmt_wheres = [] - if mask is None: - mask = "name, cycle, status" - if message: target_table = CylcWorkflowDAO.TABLE_TASK_OUTPUTS - mask = "outputs" + mask = "name, cycle, outputs" else: target_table = CylcWorkflowDAO.TABLE_TASK_STATES + mask = "name, cycle, status" stmt = rf''' SELECT @@ -145,37 +180,49 @@ def workflow_state_query( stmt_wheres.append("cycle==?") stmt_args.append(cycle) - if status: + if status is not None: stmt_frags = [] for state in self.state_lookup(status): stmt_args.append(state) stmt_frags.append("status==?") stmt_wheres.append("(" + (" OR ").join(stmt_frags) + ")") + if stmt_wheres: stmt += " where " + (" AND ").join(stmt_wheres) + # Note we can't use "where outputs==message"; because the outputs + # table holds a serialized string of all received outputs. + res = [] for row in self.conn.execute(stmt, stmt_args): - if not all(v is None for v in row): + if row[-1] is not None: + # not all(v is None for v in row): res.append(list(row)) + if message: + # Replace res with a task-states like result, + # [[foo, 2032, message], [foo, 2033, message]] + if self.back_compat_mode: + # Cylc 7 DB: list of {label: message} + res = [ + [item[0], item[1], message] + for item in res + if message in json.loads(item[2]).values() + ] + else: + # Cylc 8 DB list of [message] + res = [ + [item[0], item[1], message] + for item in res + if message in json.loads(item[2]) + ] return res - def task_state_getter(self, task, cycle): - """used to get the state of a particular task at a particular cycle""" - return self.workflow_state_query(task, cycle, mask="status")[0] - def task_state_met(self, task, cycle, status=None, message=None): - """used to check if a task is in a particular state""" - res = self.workflow_state_query(task, cycle, status, message) - if status: - return bool(res) - elif message: - return any( - message == value - for outputs_str, in res - for value in json.loads(outputs_str) - ) + """Return True if cycle/task has achieved status or output message.""" + return bool( + self.workflow_state_query(task, cycle, status, message) + ) @staticmethod def validate_mask(mask): diff --git a/cylc/flow/scripts/workflow_state.py b/cylc/flow/scripts/workflow_state.py index 170b63062d1..93b81ba1ac0 100755 --- a/cylc/flow/scripts/workflow_state.py +++ b/cylc/flow/scripts/workflow_state.py @@ -18,35 +18,61 @@ r"""cylc workflow-state [OPTIONS] ARGS -Retrieve task states from the workflow database. +Print or poll for task states or outputs in a workflow database. -Print task states retrieved from a workflow database; or (with --task, ---point, and --status) poll until a given task reaches a given state; or (with ---task, --point, and --message) poll until a task receives a given message. -Polling is configurable with --interval and --max-polls; for a one-off -check use --max-polls=1. The workflow database does not need to exist at -the time polling commences but allocated polls are consumed waiting for -it (consider max-polls*interval as an overall timeout). +For specific queries (i.e. task name AND cycle point AND status or output) the +command polls until the status or output is achieved (exit status 0: success); +or the maximum number of polls is reached (exit status 1: failure). -Note for non-cycling tasks --point=1 must be provided. +Poll number and frequency can be overridden with --max-polls and --interval. -For your own workflows the database location is determined by your -site/user config. For other workflows, e.g. those owned by others, or -mirrored workflow databases, use --run-dir=DIR to specify the location. +If the database does not exist at first, polls are consumed waiting for it. + +For less specific queries, immediate results are printed after a single check. + +For task outputs, specify the output message, not the label used in the graph. + +For non-cycling workflows provide --point=1 for specific queries. + +To target workflows owned by other users, use --run-dir. + +This command can be used to make polling tasks that trigger off of tasks in +other workflows, but see also the built-in workflow_state xtrigger. + +WARNING: task status queries return current or latest (most recent) status, +so (e.g.) querying "submitted" will not return tasks that already succeeded. +By contrast, output queries do not depend on order. Cylc 8 records standard +outputs (submitted, succeeded, etc.) along with custom ones, so for non-final +states such as submitted you should use --output not --status. Examples: - $ cylc workflow-state WORKFLOW_ID --task=TASK --point=POINT --status=STATUS - # returns 0 if TASK.POINT reaches STATUS before the maximum number of - # polls, otherwise returns 1. - - $ cylc workflow-state WORKFLOW_ID --task=TASK --point=POINT --status=STATUS \ - > --offset=PT6H - # adds 6 hours to the value of CYCLE for carrying out the polling operation. - - $ cylc workflow-state WORKFLOW_ID --task=TASK --status=STATUS --task-point - # uses CYLC_TASK_CYCLE_POINT environment variable as the value for the - # CYCLE to poll. This is useful when you want to use cylc workflow-state in a - # cylc task. + + # Print the current or latest status of all tasks: + $ cylc workflow-state WORKFLOW_ID + + # Print the current or latest status of all tasks named "foo": + $ cylc workflow-state --task=foo WORKFLOW_ID + + # Print the current or latest status of all tasks in point 2033: + $ cylc workflow-state --point=2033 WORKFLOW_ID + + # Print all tasks with the current or latest status "succeeded": + $ cylc workflow-state --status=succeeded CYLC_WORKFLOW_ID + + # Print all tasks that generated the output message "file1 ready": + $ cylc workflow-state --message="file1 ready" WORKFLOW_ID + + # Print all tasks that generated the output message "file1 ready": + $ cylc workflow-state --message="file1 ready" WORKFLOW_ID + + # Print all tasks "foo" that generated the output message "file1 ready": + $ cylc workflow-state --task=foo --message="file1 ready" WORKFLOW_ID + + # POLL UNTIL task 2033/foo succeeds: + $ cylc workflow-state --task=foo --point=2033 --status=succeeded WORKFLOW_ID + + # POLL UNTIL task 2033/foo generates output message "hello": + $ cylc workflow-state --task=foo --point=2033 --message="hello" WORKFLOW_ID """ import asyncio @@ -115,17 +141,11 @@ def connect(self): sys.stderr.write('\n') if connected and self.args['cycle']: - try: - fmt = self.checker.get_remote_point_format() - except sqlite3.OperationalError as exc: - try: - fmt = self.checker.get_remote_point_format_compat() - except sqlite3.OperationalError: - raise exc # original error - if fmt: - my_parser = TimePointParser() - my_point = my_parser.parse(self.args['cycle'], dump_format=fmt) - self.args['cycle'] = str(my_point) + my_point = TimePointParser().parse( + self.args['cycle'], + dump_format=self.checker.get_point_format() + ) + self.args['cycle'] = str(my_point) return connected, self.args['cycle'] async def check(self): @@ -142,31 +162,29 @@ def get_option_parser() -> COP: ) parser.add_option( - "-t", "--task", help="Specify a task to check the state of.", + "-t", "--task", help="Task name to query.", action="store", dest="task", default=None) parser.add_option( "-p", "--point", - help="Specify the cycle point to check task states for.", + help="Cycle point to query.", action="store", dest="cycle", default=None) parser.add_option( "-T", "--task-point", - help="Use the CYLC_TASK_CYCLE_POINT environment variable as the " - "cycle point to check task states for. " - "Shorthand for --point=$CYLC_TASK_CYCLE_POINT", + help="Short for --point=$CYLC_TASK_CYCLE_POINT, in job environments.", action="store_true", dest="use_task_point", default=False) parser.add_option( "-d", "--run-dir", - help="The top level cylc run directory if non-standard. The " - "database should be DIR/WORKFLOW_ID/log/db. Use to interrogate " - "workflows owned by others, etc.; see note above.", + help="cylc-run location, if the workflow is owned by another user." + " The database will be DIR/WORKFLOW_ID/log/db.", metavar="DIR", action="store", dest="run_dir", default=None) parser.add_option( "-s", "--offset", - help="Specify an offset to add to the targeted cycle point", + help="Specify an offset from the target cycle point. Can be useful" + " along with --task-point when polling one workflow from another.", action="store", dest="offset", default=None) conds = ("Valid triggering conditions to check for include: '" + @@ -180,8 +198,7 @@ def get_option_parser() -> COP: parser.add_option( "-S", "--status", - help="Specify a particular status or triggering condition to " - f"check for. {conds}{states}", + help=f"Task status to check for. {conds}{states}", action="store", dest="status", default=None) parser.add_option( @@ -222,9 +239,6 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None: if options.status and options.msg: raise InputError("cannot poll both status and custom output") - if options.msg and not options.task and not options.cycle: - raise InputError("need a taskname and cyclepoint") - # Exit if an invalid status is requested if (options.status and options.status not in TASK_STATUSES_ORDERED and @@ -259,12 +273,12 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None: raise CylcError("cannot connect to the workflow_id DB") if options.status and options.task and options.cycle: - # check a task status + # poll for a task status spoller.condition = options.status if not asyncio.run(spoller.poll()): sys.exit(1) - elif options.msg: - # Check for a custom task output + elif options.msg and options.task and options.cycle: + # poll for a custom task output spoller.condition = "output: %s" % options.msg if not asyncio.run(spoller.poll()): sys.exit(1) @@ -274,4 +288,6 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None: spoller.checker.workflow_state_query( task=options.task, cycle=formatted_pt, - status=options.status)) + status=options.status, + message=options.msg, + )) diff --git a/cylc/flow/xtriggers/workflow_state.py b/cylc/flow/xtriggers/workflow_state.py index f20cd214067..b9962ff3f84 100644 --- a/cylc/flow/xtriggers/workflow_state.py +++ b/cylc/flow/xtriggers/workflow_state.py @@ -90,20 +90,15 @@ def workflow_state( except (OSError, sqlite3.Error): # Failed to connect to DB; target workflow may not be started. return (False, None) - try: - fmt = checker.get_remote_point_format() - except sqlite3.OperationalError as exc: - try: - fmt = checker.get_remote_point_format_compat() - except sqlite3.OperationalError: - raise exc # original error - if fmt: - my_parser = TimePointParser() - point = str(my_parser.parse(point, dump_format=fmt)) - if message is not None: - satisfied = checker.task_state_met(task, point, message=message) - else: - satisfied = checker.task_state_met(task, point, status=status) + + point = TimePointParser().parse( + point, dump_format=checker.get_point_format() + ) + + satisfied = checker.task_state_met( + task, point, message=message, status=status + ) + results = { 'workflow': workflow, 'task': task,