Skip to content

Commit

Permalink
🔧 Fixes in last dependency get results. Add Slurm seff job. Fix permi…
Browse files Browse the repository at this point in the history
…ssions on patching results (#65)
  • Loading branch information
juanesarango authored Oct 21, 2024
1 parent f13b995 commit 8c8ad75
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 101 deletions.
2 changes: 1 addition & 1 deletion isabl_cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ def patch_analysis_status(analysis, status):

def send_error_email(recipients, subject, message):
"""
Sends an error message to recipients.
Send an error message to recipients.
Arguments:
recipients (list): email recipients.
Expand Down
44 changes: 26 additions & 18 deletions isabl_cli/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Abstract Application."""

# pylint: disable=R0201,C0103,too-many-lines

from collections import defaultdict
Expand Down Expand Up @@ -99,7 +100,14 @@ class AbstractApplication: # pylint: disable=too-many-public-methods
# Analyses in these status won't be prepared for submission. To re-rerun SUCCEEDED
# analyses see unique_analysis_per_individual. To re-rerun failed analyses use
# either --force or --restart.
skip_status = {"FAILED", "FINISHED", "STARTED", "SUBMITTED", "SUCCEEDED", "REJECTED"}
skip_status = {
"FAILED",
"FINISHED",
"STARTED",
"SUBMITTED",
"SUCCEEDED",
"REJECTED",
}

# If any of these errors is raised during the command generation process, the
# submission will continue. Errors or valdation messages are presented at the end.
Expand Down Expand Up @@ -320,7 +328,7 @@ def submit_merge_analysis(self, instance):
Use setting SUBMIT_MERGE_ANALYSIS to submit the merge work with custom logic.
Arguments:
unused-argument (dict): a project or individual instance.
instance (obj): a project or individual instance.
"""
submit_merge = system_settings.SUBMIT_MERGE_ANALYSIS

Expand Down Expand Up @@ -563,9 +571,11 @@ def application(self):
endpoint="applications",
name=self.NAME,
version=self.VERSION,
assembly={"name": self.ASSEMBLY, "species": self.SPECIES}
if self.ASSEMBLY
else None,
assembly=(
{"name": self.ASSEMBLY, "species": self.SPECIES}
if self.ASSEMBLY
else None
),
)

if application.settings.get("default_client") is None:
Expand Down Expand Up @@ -1025,11 +1035,11 @@ def send_analytics(
status = (
"FORCED"
if force
else "RESTARTED"
if restart
else "SUBMITTED"
if commit
else self._staged_message
else (
"RESTARTED"
if restart
else "SUBMITTED" if commit else self._staged_message
)
)
analyses_tuples.append((i, status))
for i, _ in skipped_tuples + invalid_tuples:
Expand Down Expand Up @@ -1173,7 +1183,7 @@ def _get_dependencies_results(self, targets, references):
result_args["application_name"] = dependency.get("app").NAME

if "status" in dependency:
result_args["status"] = dependency["status"]
result_args["status"] = dependency["status"]

input_name = dependency.get("name")
inputs[input_name], key = self.get_result(targets[0], **result_args)
Expand Down Expand Up @@ -1835,7 +1845,9 @@ def validate_are_normals(self, experiments):

def validate_individuals(self, targets, references):
"""
Validate pairs are of the same individual if the pipeline is matched;
Validate the individual of the targets and references.
Validate pairs are of the same individual if the pipeline is matched.
Validate pairs are of the different individuals if the pipeline is unmatched.
"""
if references:
Expand Down Expand Up @@ -1870,9 +1882,7 @@ def validate_individuals(self, targets, references):
)

def validate_source(self, experiments, source):
"""
Validate experiments are from a specific source material.
"""
"""Validate experiments are from a specific source material."""
for i in experiments:
assert (
i["sample"]["source"] == source
Expand All @@ -1883,9 +1893,7 @@ def validate_source(self, experiments, source):
# -------------------------

def notify_project_analyst(self, analysis, subject, message):
"""
Send email notification to analysts of projects associated with specified analysis.
"""
"""Send email notification to project's analysts of projects."""
projects = []
for target in analysis.targets:
projects.extend(target.projects)
Expand Down
22 changes: 18 additions & 4 deletions isabl_cli/batch_systems/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import random
import re
import shutil
import subprocess

import click
Expand Down Expand Up @@ -62,6 +63,7 @@ def submit_lsf(app, command_tuples): # pragma: no cover
requirements=requirements or "",
extra_args=submit_configuration.get("extra_args", ""),
throttle_by=submit_configuration.get("throttle_by", 50),
unbuffer=submit_configuration.get("unbuffer", False),
jobname=(
f"application: {app} | "
f"methods: {', '.join(methods)} | "
Expand All @@ -77,7 +79,13 @@ def submit_lsf(app, command_tuples): # pragma: no cover


def submit_lsf_array(
commands, requirements, jobname, extra_args=None, throttle_by=50, wait=False
commands,
requirements,
jobname,
extra_args=None,
throttle_by=50,
wait=False,
unbuffer=False,
): # pragma: no cover
"""
Submit an array of bash scripts.
Expand All @@ -94,6 +102,7 @@ def submit_lsf_array(
extra_args (str): extra LSF args.
throttle_by (int): max number of jobs running at same time.
wait (bool): if true, wait until clean command finishes.
unbuffer (bool): if true, will unbuffer the stdout/stderr.
Returns:
str: jobid of clean up job.
Expand All @@ -108,7 +117,8 @@ def submit_lsf_array(
datetime.now(system_settings.TIME_ZONE).isoformat(),
)

wait = "-K" if wait else ""
wait_flag = "-K" if wait else ""
unbuffer = "unbuffer" if unbuffer and shutil.which("unbuffer") else ""
os.makedirs(root, exist_ok=True)
jobname += " | rundir: {}".format(root)
total = len(commands)
Expand All @@ -121,7 +131,9 @@ def submit_lsf_array(

with open(join(root, "in.%s" % index), "w") as f:
# use random sleep to avoid parallel API hits
f.write(f"sleep {random.uniform(0, 10):.3} && bash {command}")
f.write(
f"sleep {random.uniform(0, 10):.3} && {unbuffer} bash {command}"
)

with open(join(root, "exit_cmd.%s" % index), "w") as f:
f.write(exit_command)
Expand Down Expand Up @@ -153,6 +165,8 @@ def submit_lsf_array(
jobid = re.findall("<(.*?)>", jobid)[0]

# clean the execution directory
cmd = f'bsub -J "CLEAN | {jobname}" -w "ended({jobid})" -ti {wait} rm -r {root}'
cmd = (
f'bsub -J "CLEAN | {jobname}" -w "ended({jobid})" -ti {wait_flag} rm -r {root}'
)
jobid = subprocess.check_output(cmd, shell=True).decode("utf-8")
return re.findall("<(.*?)>", jobid)[0]
21 changes: 17 additions & 4 deletions isabl_cli/batch_systems/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from os.path import join
import os
import random
import shutil
import subprocess

from slugify import slugify
Expand Down Expand Up @@ -83,6 +84,7 @@ def submit_sge(app, command_tuples): # pragma: no cover
requirements=requirements or "",
extra_args=submit_configuration.get("extra_args", ""),
throttle_by=submit_configuration.get("throttle_by", 50),
unbuffer=submit_configuration.get("unbuffer", False),
jobname=(
f"application: {app} | "
f"methods: {', '.join(methods)} | "
Expand All @@ -98,7 +100,13 @@ def submit_sge(app, command_tuples): # pragma: no cover


def submit_sge_array(
commands, requirements, jobname, extra_args=None, throttle_by=50, wait=False
commands,
requirements,
jobname,
extra_args=None,
throttle_by=50,
wait=False,
unbuffer=False,
): # pragma: no cover
"""
Submit an array of bash scripts.
Expand All @@ -115,6 +123,7 @@ def submit_sge_array(
extra_args (str): extra LSF args.
throttle_by (int): max number of jobs running at same time.
wait (bool): if true, wait until clean command finishes.
unbuffer (bool): if true, will unbuffer the stdout/stderr.
Returns:
str: jobid of clean up job.
Expand All @@ -129,7 +138,8 @@ def submit_sge_array(
datetime.now(system_settings.TIME_ZONE).isoformat(),
)

wait = "-sync y" if wait else ""
wait_flag = "-sync y" if wait else ""
unbuffer = "unbuffer" if unbuffer and shutil.which("unbuffer") else ""
os.makedirs(root, exist_ok=True)
jobname += "-rundir: {}".format(root)
jobname = slugify(jobname)
Expand All @@ -144,11 +154,14 @@ def submit_sge_array(

with open(join(root, "in.%s" % index), "w") as f:
# use random sleep to avoid parallel API hits
sge_command = (
f"sleep {random.uniform(0, 10):.3} && {unbuffer} bash {command}"
)
f.write(
COMMAND.format(
exit_command=exit_command,
exit_log=join(rundir, "head_job.exit"),
command=f"sleep {random.uniform(0, 10):.3} && bash {command}",
command=sge_command,
)
)

Expand All @@ -174,7 +187,7 @@ def submit_sge_array(
jobid = jobid.strip().split(".")[0]

cmd = (
f'qsub {base_args} -N "CLEAN-{jobname}" -hold_jid {jobid} {wait} '
f'qsub {base_args} -N "CLEAN-{jobname}" -hold_jid {jobid} {wait_flag} '
f"-o /dev/null -e /dev/null {root}/clean.sh"
)

Expand Down
48 changes: 30 additions & 18 deletions isabl_cli/batch_systems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from os.path import join
import os
import random
import shutil
import subprocess

from slugify import slugify
Expand Down Expand Up @@ -60,6 +61,7 @@ def submit_slurm(app, command_tuples): # pragma: no cover
requirements=requirements or "",
extra_args=submit_configuration.get("extra_args", ""),
throttle_by=submit_configuration.get("throttle_by", 50),
unbuffer=submit_configuration.get("unbuffer", False),
jobname=(
f"application: {app} | "
f"methods: {', '.join(methods)} | "
Expand All @@ -75,14 +77,21 @@ def submit_slurm(app, command_tuples): # pragma: no cover


def submit_slurm_array(
commands, requirements, jobname, extra_args=None, throttle_by=50, wait=False
commands,
requirements,
jobname,
extra_args=None,
throttle_by=50,
wait=False,
unbuffer=False,
): # pragma: no cover
"""
Submit an array of bash scripts.
Two other jobs will also be submitted:
Three other jobs will also be submitted:
EXIT: run exit command if failure.
SEFF: run slurm utility to printout job metrics.
CLEAN: clean temporary files and directories after completion.
Arguments:
Expand All @@ -92,6 +101,7 @@ def submit_slurm_array(
extra_args (str): extra SLURM args.
throttle_by (int): max number of jobs running at same time.
wait (bool): if true, wait until clean command finishes.
unbuffer (bool): if true, will unbuffer the stdout/stderr.
Returns:
str: jobid of clean up job.
Expand All @@ -106,7 +116,8 @@ def submit_slurm_array(
datetime.now(system_settings.TIME_ZONE).isoformat(),
)

wait = "-W" if wait else ""
wait_flag = "-W" if wait else ""
unbuffer = "unbuffer" if unbuffer and shutil.which("unbuffer") else ""
os.makedirs(root, exist_ok=True)
jobname += "-rundir: {}".format(root)
jobname = slugify(jobname)
Expand All @@ -118,24 +129,24 @@ def submit_slurm_array(
index += 1
rundir = abspath(dirname(command))

with open(join(root, "in.%s" % index), "w") as f:
with open(join(root, f"in.{index}"), "w") as f:
# submit a dependency job on failure
# important when the scheduler kills the head job
dependency = "${SLURM_ARRAY_JOB_ID}_${SLURM_ARRAY_TASK_ID}"
after_not_ok_job = (
f"sbatch {extra_args} --depend=afternotok:{dependency} --kill-on-invalid-dep yes "
f'--export=TMP,TMPDIR,TMP_DIR -o {join(rundir, "head_job.exit")} -J "EXIT: {dependency}" '
f"sbatch {extra_args} --depend=afternotok:{dependency} --kill-on-invalid-dep=yes "
f'--export=ALL -o {join(rundir, "head_job.exit")} -J "EXIT: {dependency}" '
f"<< EOF\n#!/bin/bash\n{exit_command}\nEOF\n"
)

# use random sleep to avoid parallel API hits
f.write(
f"#!/bin/bash\n\n"
f"sleep {random.uniform(0, 10):.3} && "
f"({after_not_ok_job}) && bash {command}"
f"sleep {random.uniform(0, 10):.3f} && "
f"({after_not_ok_job}) && {unbuffer} bash {command}"
)

for j in "log", "err", "exit", "slurm":
for j in {"log", "err", "exit", "slurm"}:
src = join(rundir, f"head_job.{j}")
dst = join(root, f"{j}.{index}")
open(src, "w").close()
Expand All @@ -144,9 +155,6 @@ def submit_slurm_array(
with open(join(root, "in.sh"), "w") as f:
f.write(f"#!/bin/bash\nbash {root}/in.$SLURM_ARRAY_TASK_ID")

with open(join(root, "clean.sh"), "w") as f:
f.write(f"#!/bin/bash\nrm -rf {root}")

# Main job array
cmd = (
f"sbatch {requirements} {extra_args} --array 1-{total}%{throttle_by} "
Expand All @@ -159,19 +167,23 @@ def submit_slurm_array(
seff_jobids = []
for i in range(1, total + 1):
seff_cmd = (
f"sbatch {extra_args} --kill-on-invalid-dep=yes "
f"--dependency=afterany:{jobid}_{i} -o '{root}/slurm.{i}' -J 'SEFF: {jobname}' "
f"--wrap='seff {jobid}_{i}'"
f"sbatch {extra_args} -o /dev/null -e /dev/null "
f"--dependency=afterany:{jobid}_{i} -J 'SEFF: {jobid}_{i}' "
f"--wrap='for sleep_time in 10 20 60 180 360; do sleep $sleep_time;"
f"(seff {jobid}_{i} >> {root}/slurm.{i} || false) && break; done'"
)
seff_jobid = (
subprocess.check_output(seff_cmd, shell=True).decode("utf-8").strip()
)
seff_jobid = subprocess.check_output(seff_cmd, shell=True).decode("utf-8").strip()
seff_jobids.append(seff_jobid.split()[-1])

# Job to clean job array rundir
with open(join(root, "clean.sh"), "w") as f:
f.write(f"#!/bin/bash\nrm -rf {root}")

cmd = (
f"sbatch {extra_args} -J 'CLEAN: {jobname}' {wait} --kill-on-invalid-dep yes "
f"-o /dev/null -e /dev/null --depend=afterany:{':'.join(seff_jobids)} --parsable {root}/clean.sh"
f"sbatch {extra_args} -J 'CLEAN: {dependency}' {wait_flag} -o /dev/null "
f"-e /dev/null --dependency=afterany:{':'.join(seff_jobids)} {root}/clean.sh"
)

return subprocess.check_output(cmd, shell=True).decode("utf-8").strip()
Loading

0 comments on commit 8c8ad75

Please sign in to comment.