Skip to content

Commit

Permalink
Fix workflow-state command and xtrigger.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Nov 3, 2023
1 parent 36a6ce9 commit ab469a6
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 95 deletions.
103 changes: 75 additions & 28 deletions cylc/flow/dbstatecheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ def __init__(self, rund, workflow, db_path=None):
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), db_path)
self.conn = sqlite3.connect(db_path, timeout=10.0)

# Get workflow point format.
try:
self.point_fmt = self._get_pt_fmt()
self.back_compat_mode = False
except sqlite3.OperationalError as exc:
# BACK COMPAT: Cylc 7 DB (see method below).
try:
self.point_fmt = self._get_pt_fmt_compat()
self.back_compat_mode = True
except sqlite3.OperationalError:
raise exc # original error

@staticmethod
def display_maps(res):
if not res:
Expand All @@ -75,7 +87,7 @@ def display_maps(res):
for row in res:
sys.stdout.write((", ").join(row) + "\n")

def get_remote_point_format(self):
def _get_pt_fmt(self):
"""Query a workflow database for a 'cycle point format' entry"""
for row in self.conn.execute(
rf'''
Expand All @@ -90,11 +102,16 @@ def get_remote_point_format(self):
):
return row[0]

def get_remote_point_format_compat(self):
"""Query a Cylc 7 suite database for a 'cycle point format' entry.
Back compat for Cylc 8 workflow state triggers targeting Cylc 7 DBs.
"""
def _get_pt_fmt_compat(self):
"""Query a Cylc 7 suite database for 'cycle point format'."""
# BACK COMPAT: Cylc 7 DB
# Workflows parameters table name change.
# from:
# 8.0.x
# to:
# 8.1.x
# remove at:
# 8.x
for row in self.conn.execute(
rf'''
SELECT
Expand All @@ -108,6 +125,10 @@ def get_remote_point_format_compat(self):
):
return row[0]

def get_point_format(self):
"""Return the cycle point format of this DB."""
return self.point_fmt

def state_lookup(self, state):
"""allows for multiple states to be searched via a status alias"""
if state in self.STATE_ALIASES:
Expand All @@ -116,19 +137,33 @@ def state_lookup(self, state):
return [state]

def workflow_state_query(
self, task, cycle, status=None, message=None, mask=None):
"""run a query on the workflow database"""
self, task, cycle, status=None, message=None):
"""Query task status or outputs in workflow database.
Returns a list of tasks with matching status or output message.
NOTE: the task_states table holds the latest state only, so querying
(e.g.) submitted will fail for a task that is running or finished.
Query cycle=2023, status=succeeded:
[[foo, 2023, succeeded], [bar, 2023, succeeded]]
Query task=foo, message="file ready":
[[foo, 2023, "file ready"], [foo, 2024, "file ready"]]
Query task=foo, point=2023, message="file ready":
[[foo, 2023, "file ready"]]
"""
stmt_args = []
stmt_wheres = []

if mask is None:
mask = "name, cycle, status"

if message:
target_table = CylcWorkflowDAO.TABLE_TASK_OUTPUTS
mask = "outputs"
mask = "name, cycle, outputs"
else:
target_table = CylcWorkflowDAO.TABLE_TASK_STATES
mask = "name, cycle, status"

stmt = rf'''
SELECT
Expand All @@ -145,37 +180,49 @@ def workflow_state_query(
stmt_wheres.append("cycle==?")
stmt_args.append(cycle)

if status:
if status is not None:
stmt_frags = []
for state in self.state_lookup(status):
stmt_args.append(state)
stmt_frags.append("status==?")
stmt_wheres.append("(" + (" OR ").join(stmt_frags) + ")")

if stmt_wheres:
stmt += " where " + (" AND ").join(stmt_wheres)

# Note we can't use "where outputs==message"; because the outputs
# table holds a serialized string of all received outputs.

res = []
for row in self.conn.execute(stmt, stmt_args):
if not all(v is None for v in row):
if row[-1] is not None:
# not all(v is None for v in row):
res.append(list(row))

if message:
# Replace res with a task-states like result,
# [[foo, 2032, message], [foo, 2033, message]]
if self.back_compat_mode:
# Cylc 7 DB: list of {label: message}
res = [
[item[0], item[1], message]
for item in res
if message in json.loads(item[2]).values()
]
else:
# Cylc 8 DB list of [message]
res = [
[item[0], item[1], message]
for item in res
if message in json.loads(item[2])
]
return res

def task_state_getter(self, task, cycle):
"""used to get the state of a particular task at a particular cycle"""
return self.workflow_state_query(task, cycle, mask="status")[0]

def task_state_met(self, task, cycle, status=None, message=None):
"""used to check if a task is in a particular state"""
res = self.workflow_state_query(task, cycle, status, message)
if status:
return bool(res)
elif message:
return any(
message == value
for outputs_str, in res
for value in json.loads(outputs_str)
)
"""Return True if cycle/task has achieved status or output message."""
return bool(
self.workflow_state_query(task, cycle, status, message)
)

@staticmethod
def validate_mask(mask):
Expand Down
122 changes: 69 additions & 53 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,61 @@

r"""cylc workflow-state [OPTIONS] ARGS
Retrieve task states from the workflow database.
Print or poll for task states or outputs in a workflow database.
Print task states retrieved from a workflow database; or (with --task,
--point, and --status) poll until a given task reaches a given state; or (with
--task, --point, and --message) poll until a task receives a given message.
Polling is configurable with --interval and --max-polls; for a one-off
check use --max-polls=1. The workflow database does not need to exist at
the time polling commences but allocated polls are consumed waiting for
it (consider max-polls*interval as an overall timeout).
For specific queries (i.e. task name AND cycle point AND status or output) the
command polls until the status or output is achieved (exit status 0: success);
or the maximum number of polls is reached (exit status 1: failure).
Note for non-cycling tasks --point=1 must be provided.
Poll number and frequency can be overridden with --max-polls and --interval.
For your own workflows the database location is determined by your
site/user config. For other workflows, e.g. those owned by others, or
mirrored workflow databases, use --run-dir=DIR to specify the location.
If the database does not exist at first, polls are consumed waiting for it.
For less specific queries, immediate results are printed after a single check.
For task outputs, specify the output message, not the label used in the graph.
For non-cycling workflows provide --point=1 for specific queries.
To target workflows owned by other users, use --run-dir.
This command can be used to make polling tasks that trigger off of tasks in
other workflows, but see also the built-in workflow_state xtrigger.
WARNING: task status queries return current or latest (most recent) status,
so (e.g.) querying "submitted" will not return tasks that already succeeded.
By contrast, output queries do not depend on order. Cylc 8 records standard
outputs (submitted, succeeded, etc.) along with custom ones, so for non-final
states such as submitted you should use --output not --status.
Examples:
$ cylc workflow-state WORKFLOW_ID --task=TASK --point=POINT --status=STATUS
# returns 0 if TASK.POINT reaches STATUS before the maximum number of
# polls, otherwise returns 1.
$ cylc workflow-state WORKFLOW_ID --task=TASK --point=POINT --status=STATUS \
> --offset=PT6H
# adds 6 hours to the value of CYCLE for carrying out the polling operation.
$ cylc workflow-state WORKFLOW_ID --task=TASK --status=STATUS --task-point
# uses CYLC_TASK_CYCLE_POINT environment variable as the value for the
# CYCLE to poll. This is useful when you want to use cylc workflow-state in a
# cylc task.
# Print the current or latest status of all tasks:
$ cylc workflow-state WORKFLOW_ID
# Print the current or latest status of all tasks named "foo":
$ cylc workflow-state --task=foo WORKFLOW_ID
# Print the current or latest status of all tasks in point 2033:
$ cylc workflow-state --point=2033 WORKFLOW_ID
# Print all tasks with the current or latest status "succeeded":
$ cylc workflow-state --status=succeeded CYLC_WORKFLOW_ID
# Print all tasks that generated the output message "file1 ready":
$ cylc workflow-state --message="file1 ready" WORKFLOW_ID
# Print all tasks that generated the output message "file1 ready":
$ cylc workflow-state --message="file1 ready" WORKFLOW_ID
# Print all tasks "foo" that generated the output message "file1 ready":
$ cylc workflow-state --task=foo --message="file1 ready" WORKFLOW_ID
# POLL UNTIL task 2033/foo succeeds:
$ cylc workflow-state --task=foo --point=2033 --status=succeeded WORKFLOW_ID
# POLL UNTIL task 2033/foo generates output message "hello":
$ cylc workflow-state --task=foo --point=2033 --message="hello" WORKFLOW_ID
"""

import asyncio
Expand Down Expand Up @@ -115,17 +141,11 @@ def connect(self):
sys.stderr.write('\n')

if connected and self.args['cycle']:
try:
fmt = self.checker.get_remote_point_format()
except sqlite3.OperationalError as exc:
try:
fmt = self.checker.get_remote_point_format_compat()
except sqlite3.OperationalError:
raise exc # original error
if fmt:
my_parser = TimePointParser()
my_point = my_parser.parse(self.args['cycle'], dump_format=fmt)
self.args['cycle'] = str(my_point)
my_point = TimePointParser().parse(
self.args['cycle'],
dump_format=self.checker.get_point_format()
)
self.args['cycle'] = str(my_point)
return connected, self.args['cycle']

async def check(self):
Expand All @@ -142,31 +162,29 @@ def get_option_parser() -> COP:
)

parser.add_option(
"-t", "--task", help="Specify a task to check the state of.",
"-t", "--task", help="Task name to query.",
action="store", dest="task", default=None)

parser.add_option(
"-p", "--point",
help="Specify the cycle point to check task states for.",
help="Cycle point to query.",
action="store", dest="cycle", default=None)

parser.add_option(
"-T", "--task-point",
help="Use the CYLC_TASK_CYCLE_POINT environment variable as the "
"cycle point to check task states for. "
"Shorthand for --point=$CYLC_TASK_CYCLE_POINT",
help="Short for --point=$CYLC_TASK_CYCLE_POINT, in job environments.",
action="store_true", dest="use_task_point", default=False)

parser.add_option(
"-d", "--run-dir",
help="The top level cylc run directory if non-standard. The "
"database should be DIR/WORKFLOW_ID/log/db. Use to interrogate "
"workflows owned by others, etc.; see note above.",
help="cylc-run location, if the workflow is owned by another user."
" The database will be DIR/WORKFLOW_ID/log/db.",
metavar="DIR", action="store", dest="run_dir", default=None)

parser.add_option(
"-s", "--offset",
help="Specify an offset to add to the targeted cycle point",
help="Specify an offset from the target cycle point. Can be useful"
" along with --task-point when polling one workflow from another.",
action="store", dest="offset", default=None)

conds = ("Valid triggering conditions to check for include: '" +
Expand All @@ -180,8 +198,7 @@ def get_option_parser() -> COP:

parser.add_option(
"-S", "--status",
help="Specify a particular status or triggering condition to "
f"check for. {conds}{states}",
help=f"Task status to check for. {conds}{states}",
action="store", dest="status", default=None)

parser.add_option(
Expand Down Expand Up @@ -222,9 +239,6 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:
if options.status and options.msg:
raise InputError("cannot poll both status and custom output")

if options.msg and not options.task and not options.cycle:
raise InputError("need a taskname and cyclepoint")

# Exit if an invalid status is requested
if (options.status and
options.status not in TASK_STATUSES_ORDERED and
Expand Down Expand Up @@ -259,12 +273,12 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:
raise CylcError("cannot connect to the workflow_id DB")

if options.status and options.task and options.cycle:
# check a task status
# poll for a task status
spoller.condition = options.status
if not asyncio.run(spoller.poll()):
sys.exit(1)
elif options.msg:
# Check for a custom task output
elif options.msg and options.task and options.cycle:
# poll for a custom task output
spoller.condition = "output: %s" % options.msg
if not asyncio.run(spoller.poll()):
sys.exit(1)
Expand All @@ -274,4 +288,6 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:
spoller.checker.workflow_state_query(
task=options.task,
cycle=formatted_pt,
status=options.status))
status=options.status,
message=options.msg,
))
23 changes: 9 additions & 14 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,15 @@ def workflow_state(
except (OSError, sqlite3.Error):
# Failed to connect to DB; target workflow may not be started.
return (False, None)
try:
fmt = checker.get_remote_point_format()
except sqlite3.OperationalError as exc:
try:
fmt = checker.get_remote_point_format_compat()
except sqlite3.OperationalError:
raise exc # original error
if fmt:
my_parser = TimePointParser()
point = str(my_parser.parse(point, dump_format=fmt))
if message is not None:
satisfied = checker.task_state_met(task, point, message=message)
else:
satisfied = checker.task_state_met(task, point, status=status)

point = TimePointParser().parse(
point, dump_format=checker.get_point_format()
)

satisfied = checker.task_state_met(
task, point, message=message, status=status
)

results = {
'workflow': workflow,
'task': task,
Expand Down

0 comments on commit ab469a6

Please sign in to comment.