Skip to content

Commit

Permalink
Manual trigger: run now even when paused.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Nov 29, 2024
1 parent 9ff50f8 commit 0372d91
Show file tree
Hide file tree
Showing 20 changed files with 496 additions and 172 deletions.
1 change: 1 addition & 0 deletions changes.d/6499.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Manual trigger: run tasks immediately even if the workflow is paused.
7 changes: 5 additions & 2 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ async def reload_workflow(schd: 'Scheduler'):

# flush out preparing tasks before attempting reload
schd.reload_pending = 'waiting for pending tasks to submit'
while schd.release_queued_tasks():
while schd.release_tasks_to_run():
# Run the subset of main-loop functionality required to push
# preparing through the submission pipeline and keep the workflow
# responsive (e.g. to the `cylc stop` command).
Expand Down Expand Up @@ -448,9 +448,12 @@ async def force_trigger_tasks(
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None,
on_resume: bool = False
):
"""Manual task trigger."""
validate.is_tasks(tasks)
validate.flow_opts(flow, flow_wait)
yield
yield schd.pool.force_trigger_tasks(tasks, flow, flow_wait, flow_descr)
yield schd.pool.force_trigger_tasks(
tasks, flow, flow_wait, flow_descr, on_resume
)
18 changes: 11 additions & 7 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2180,20 +2180,24 @@ class Arguments(TaskMutation.Arguments, FlowMutationArguments):
class Trigger(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Manually trigger tasks.
Manually trigger tasks, even in a paused workflow.
Warning: waiting tasks that are queue-limited will be queued if
triggered, to submit as normal when released by the queue; queued
tasks will submit immediately if triggered, even if that violates
the queue limit (so you may need to trigger a queue-limited task
twice to get it to submit immediately).
Triggering an unqueued task queues it, to run when queue-released.
Triggering a queued task runs it now regardless of queue limiting.
The "on resume" option waits for a paused workflow to be resumed.
Valid for: paused, running workflows.
''')
resolver = partial(mutator, command='force_trigger_tasks')

class Arguments(TaskMutation.Arguments, FlowMutationArguments):
...
on_resume = Boolean(
default_value=False,
description=sstrip('''
Run triggered tasks once the paused workflow is resumed.
''')
)


def _mut_field(cls):
Expand Down
63 changes: 50 additions & 13 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,19 @@ async def run_scheduler(self) -> None:
self.restart_remote_init()
await commands.run_cmd(commands.poll_tasks(self, ['*/*']))

# If we shut down with manually triggered waiting tasks,
# submit them to run now.
pre_prep_tasks = []
for itask in self.pool.get_tasks():
if (
itask.is_manual_submit
and itask.state(TASK_STATUS_WAITING)
):
itask.waiting_on_job_prep = True
pre_prep_tasks.append(itask)

self.start_job_submission(pre_prep_tasks)

self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting')
await asyncio.gather(
*main_loop.get_runners(
Expand Down Expand Up @@ -743,10 +756,10 @@ async def start(self):
self.uuid_str = dict(params)['uuid_str']
else:
self.uuid_str = str(uuid4())
self.task_events_mgr.uuid_str = self.uuid_str

self._configure_contact()
await self.configure(params)
self.task_events_mgr.uuid_str = self.uuid_str
except (KeyboardInterrupt, asyncio.CancelledError, Exception) as exc:
await self.handle_exception(exc)

Expand Down Expand Up @@ -830,6 +843,7 @@ def _load_pool_from_db(self):
self.xtrigger_mgr.load_xtrigger_for_restart)
self.workflow_db_mgr.pri_dao.select_abs_outputs_for_restart(
self.pool.load_abs_outputs_for_restart)

self.pool.load_db_tasks_to_hold()
self.pool.update_flow_mgr()

Expand Down Expand Up @@ -1394,8 +1408,8 @@ def run_event_handlers(self, event, reason=""):
return
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self) -> bool:
"""Release queued tasks, and submit jobs.
def release_tasks_to_run(self) -> bool:
"""Release queued or manually submitted tasks, and submit jobs.
The task queue manages references to task proxies in the task pool.
Expand All @@ -1419,13 +1433,24 @@ def release_queued_tasks(self) -> bool:
submission).
"""
pre_prep_tasks: Set['TaskProxy'] = set()
if (
not self.is_paused
and self.stop_mode is None
self.stop_mode is None
and self.auto_restart_time is None
and self.reload_pending is False
):
pre_prep_tasks = self.pool.release_queued_tasks()
if self.pool.tasks_to_trigger_now:
# manually triggered tasks to run now, workflow paused or not
pre_prep_tasks.update(self.pool.tasks_to_trigger_now)
self.pool.tasks_to_trigger_now = set()

if not self.is_paused:
# release queued tasks
pre_prep_tasks.update(self.pool.release_queued_tasks())
if self.pool.tasks_to_trigger_on_resume:
# and manually triggered tasks to run once workflow resumed
pre_prep_tasks.update(self.pool.tasks_to_trigger_on_resume)
self.pool.tasks_to_trigger_on_resume = set()

Check warning on line 1453 in cylc/flow/scheduler.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/scheduler.py#L1452-L1453

Added lines #L1452 - L1453 were not covered by tests

elif (
(
Expand All @@ -1437,19 +1462,30 @@ def release_queued_tasks(self) -> bool:
self.reload_pending
)
):
# don't release queued tasks, finish processing preparing tasks
pre_prep_tasks = [
# finish processing preparing tasks first
pre_prep_tasks = {
itask for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
]
}

# Return, if no tasks to submit.
else:
return False

if not pre_prep_tasks:
return False

# Start the job submission process.
return self.start_job_submission(pre_prep_tasks)

def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
"""Start the job submission process for some tasks.
Return True if any were started, else False.
"""
if self.stop_mode is not None:
return False

self.is_updated = True
self.reset_inactivity_timer()

Expand All @@ -1459,9 +1495,10 @@ def release_queued_tasks(self) -> bool:
log = LOG.debug
if self.options.reftest or self.options.genref:
log = LOG.info

for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
pre_prep_tasks,
itasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
is_simulation=(self.get_run_mode() == RunMode.SIMULATION)
Expand Down Expand Up @@ -1703,7 +1740,7 @@ async def _main_loop(self) -> None:
self.broadcast_mgr.check_ext_triggers(
itask, self.ext_trigger_queue)

if itask.is_ready_to_run():
if itask.is_ready_to_run() and not itask.is_manual_submit:
self.pool.queue_task(itask)

if self.xtrigger_mgr.sequential_spawn_next:
Expand All @@ -1713,7 +1750,7 @@ async def _main_loop(self) -> None:
self.xtrigger_mgr.housekeep(self.pool.get_tasks())

self.pool.clock_expire_tasks()
self.release_queued_tasks()
self.release_tasks_to_run()

if (
self.get_run_mode() == RunMode.SIMULATION
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/scripts/pause.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
Pause a workflow.
This suspends submission of all tasks in a workflow.
This suspends submission of all tasks until the workflow is resumed, except
for tasks manually triggered "now" with `cylc trigger --now`.
Examples:
# pause my_workflow
Expand Down
24 changes: 19 additions & 5 deletions cylc/flow/scripts/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

"""cylc trigger [OPTIONS] ARGS
Force tasks to run regardless of prerequisites.
Manually trigger tasks regardless of prerequisites, even in a paused workflow.
* Triggering an unqueued waiting task queues it, regardless of prerequisites.
* Triggering a queued task submits it, regardless of queue limiting.
* Triggering an active task has no effect (it already triggered).
Triggering an unqueued task queues it, to run when released by the queue.
Triggering a queued task runs it immediately regardless of queue limiting.
So you may need to trigger a task twice if queue limiting is in effect.
Attempts to trigger active tasks (submitted or running) will be ignored.
Examples:
# trigger task foo in cycle 1234 in test
Expand Down Expand Up @@ -74,13 +76,15 @@
$flow: [Flow!],
$flowWait: Boolean,
$flowDescr: String,
$onResume: Boolean,
) {
trigger (
workflows: $wFlows,
tasks: $tasks,
flow: $flow,
flowWait: $flowWait,
flowDescr: $flowDescr
flowDescr: $flowDescr,
onResume: $onResume,
) {
result
}
Expand All @@ -96,7 +100,16 @@ def get_option_parser() -> COP:
multiworkflow=True,
argdoc=[FULL_ID_MULTI_ARG_DOC],
)

add_flow_opts(parser)

parser.add_option(
"--on-resume",
help=r"Run triggered tasks once a paused workflow is resumed.",
action="store_true",
default=False,
dest="on_resume"
)
return parser


Expand All @@ -114,6 +127,7 @@ async def run(options: 'Values', workflow_id: str, *tokens_list):
'flow': options.flow,
'flowWait': options.flow_wait,
'flowDescr': options.flow_descr,
'onResume': options.on_resume,
}
}
return await pclient.async_request('graphql', mutation_kwargs)
Expand Down
Loading

0 comments on commit 0372d91

Please sign in to comment.