From f0ca51ef1f6f7dbc7f678270c43aa6c3954caa4c Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Fri, 13 May 2022 14:08:57 -0700 Subject: [PATCH 01/19] fix pyyaml install issue --- .isort.cfg | 2 +- scripts/gcp_caper_server/create_instance.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.isort.cfg b/.isort.cfg index 2c982c6f..c061ec84 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -4,7 +4,7 @@ include_trailing_comma = True force_grid_wrap = 0 use_parentheses = True line_length = 88 -known_third_party = WDL,autouri,humanfriendly,matplotlib,numpy,pandas,pyhocon,pytest,requests,setuptools,sklearn +known_third_party = WDL,autouri,distutils,humanfriendly,matplotlib,numpy,pandas,pyhocon,pytest,requests,setuptools,sklearn [mypy-bin] ignore_errors = True diff --git a/scripts/gcp_caper_server/create_instance.sh b/scripts/gcp_caper_server/create_instance.sh index 7cf211e6..c7dd518c 100755 --- a/scripts/gcp_caper_server/create_instance.sh +++ b/scripts/gcp_caper_server/create_instance.sh @@ -295,7 +295,7 @@ sudo psql -d $POSTGRESQL_DB_NAME -c \"create role $POSTGRESQL_DB_USER with super ### upgrade pip and install caper croo sudo python3 -m pip install --upgrade pip -sudo pip install caper croo +sudo pip install --ignore-installed caper croo """ echo "$(date): Google auth with service account key file." From 13876c2a7ac954d606b19710e03856b68320333f Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Tue, 31 May 2022 14:40:17 -0700 Subject: [PATCH 02/19] add hpc sub-commands --- README.md | 8 +- caper/caper_args.py | 71 ++++++++++++++ caper/caper_init.py | 114 +++++++---------------- caper/cli.py | 7 +- caper/cli_hpc.py | 81 ++++++++++++++++ caper/cromwell.py | 2 +- caper/hpc.py | 222 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 418 insertions(+), 87 deletions(-) create mode 100644 caper/cli_hpc.py create mode 100644 caper/hpc.py diff --git a/README.md b/README.md index 0e15dea6..d40ea90a 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ See [this](scripts/aws_caper_server/README.md) for details. 1) Make sure that you have Java (>= 11) and Python>=3.6 installed on your system and `pip` to install Caper. ```bash - $ pip install pip --upgrade + $ pip install pip --upgrade $ pip install caper ``` @@ -39,12 +39,12 @@ See [this](scripts/aws_caper_server/README.md) for details. **Backend**|**Description** :--------|:----- local | local computer without cluster engine. - slurm | SLURM cluster. + slurm | SLURM cluster (e.g. Stanford Sherlock and SCG). sge | Sun GridEngine cluster. pbs | PBS cluster. lsf | LSF cluster. - sherlock | Stanford Sherlock (based on `slurm` backend). - scg | Stanford SCG (based on `slurm` backend). + + > **IMPORTANT**: `sherlock` and `scg` backends have been deprecated. Use `slurm` backend instead and define `slurm-partition=` for Sherlock or `slurm-account=` in Caper's configuration file. ```bash $ caper init [BACKEND] diff --git a/caper/caper_args.py b/caper/caper_args.py index b4dda49c..6200a27c 100644 --- a/caper/caper_args.py +++ b/caper/caper_args.py @@ -25,6 +25,12 @@ from .cromwell_rest_api import CromwellRestAPI from .resource_analysis import ResourceAnalysis from .server_heartbeat import ServerHeartbeat +from .hpc import ( + SlurmWrapper, + SgeWrapper, + PbsWrapper, + LsfWrapper, +) DEFAULT_CAPER_CONF = '~/.caper/default.conf' DEFAULT_LIST_FORMAT = 'id,status,name,str_label,user,parent,submission' @@ -521,6 +527,39 @@ def get_parser_and_defaults(conf_file=None): 'This can also be used as a flag to use Conda environment ' 'defined in your WDL file under "workflow.meta.default_conda".', ) + group_hpc_submit = parent_submit.add_argument_group( + title='Parameters for "caper hpc submit" command only', + ) + group_hpc_submit.add_argument( + '--leader-job-name', + help='Leader job name for a submitted workflow.' + 'This name will be appended to the prefix "CAPER_LEADER_" and then ' + 'submitted to HPC. Such prefix is used to identify Caper leader jobs.' + ) + group_hpc_submit.add_argument( + '--slurm-leader-job-resource-param', + help='Resource parameters to submit a Caper leader job to SLURM. ' + 'Make sure to quote if you use it in the command line arguments.', + default=' '.join(SlurmWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM) + ) + group_hpc_submit.add_argument( + '--sge-leader-job-resource-param', + help='Resource parameters to submit a Caper leader job to SGE' + 'Make sure to quote if you use it in the command line arguments.', + default=' '.join(SgeWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM) + ) + group_hpc_submit.add_argument( + '--pbs-leader-job-resource-param', + help='Resource parameters to submit a Caper leader job to PBS' + 'Make sure to quote if you use it in the command line arguments.', + default=' '.join(PbsWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM) + ) + group_hpc_submit.add_argument( + '--lsf-leader-job-resource-param', + help='Resource parameters to submit a Caper leader job to LSF' + 'Make sure to quote if you use it in the command line arguments.', + default=' '.join(LsfWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM) + ) group_slurm = parent_submit.add_argument_group('SLURM arguments') group_slurm.add_argument('--slurm-partition', help='SLURM partition') @@ -727,6 +766,14 @@ def get_parser_and_defaults(conf_file=None): 'This is used for cloud backends only.', ) + # hpc abort + parent_hpc_abort = argparse.ArgumentParser(add_help=False) + parent_hpc_abort.add_argument( + 'job_ids', + nargs='+', + help='Job ID or list of job IDs to abort matching Caper leader jobs.' + ) + # all subcommands p_init = subparser.add_parser( 'init', @@ -810,6 +857,30 @@ def get_parser_and_defaults(conf_file=None): parent_troubleshoot, ], ) + + p_hpc = subparser.add_parser( + 'hpc', + help='Subcommand for HPCs', + parents=[parent_all], + ) + subparser_hpc = p_hpc.add_subparsers(dest='hpc_action') + p_hpc_submit = subparser_hpc.add_parser( + 'submit', + help='Submit a single workflow to HPC.', + parents=[parent_all, parent_submit, parent_run, parent_runner, parent_backend], + ) + + p_hpc_list = subparser_hpc.add_parser( + 'list', + help='List all workflows submitted to HPC.', + parents=[parent_all, parent_backend], + ) + p_hpc_abort = subparser_hpc.add_parser( + 'abort', + help='Abort a workflow submitted to HPC.', + parents=[parent_all, parent_backend, parent_hpc_abort], + ) + p_gcp_monitor = subparser.add_parser( 'gcp_monitor', help='Tabulate task\'s resource data collected on ' diff --git a/caper/caper_init.py b/caper/caper_init.py index 7c701bd2..7ea14a1a 100644 --- a/caper/caper_init.py +++ b/caper/caper_init.py @@ -16,133 +16,101 @@ CromwellBackendSlurm, ) -BACKEND_ALIAS_SHERLOCK = 'sherlock' -BACKEND_ALIAS_SCG = 'scg' - - -CONF_CONTENTS_DB = """ -# Metadata DB for call-caching (reusing previous outputs): -# Cromwell supports restarting workflows based on a metadata DB -# DB is in-memory by default -#db=in-memory - -# If you use 'caper server' then you can use one unified '--file-db' -# for all submitted workflows. In such case, uncomment the following two lines -# and defined file-db as an absolute path to store metadata of all workflows -#db=file -#file-db= - -# If you use 'caper run' and want to use call-caching: -# Make sure to define different 'caper run ... --db file --file-db DB_PATH' -# for each pipeline run. -# But if you want to restart then define the same '--db file --file-db DB_PATH' -# then Caper will collect/re-use previous outputs without running the same task again -# Previous outputs will be simply hard/soft-linked. - -""" +from .hpc import ( + SlurmWrapper, + SgeWrapper, + PbsWrapper, + LsfWrapper, +) -CONF_CONTENTS_LOCAL_HASH_STRAT = """ -# Hashing strategy for call-caching (3 choices) -# This parameter is for local (local/slurm/sge/pbs/lsf) backend only. -# This is important for call-caching, -# which means re-using outputs from previous/failed workflows. -# Cache will miss if different strategy is used. -# "file" method has been default for all old versions of Caper<1.0. -# "path+modtime" is a new default for Caper>=1.0, -# file: use md5sum hash (slow). -# path: use path. -# path+modtime: use path and modification time. -local-hash-strat=path+modtime -""" CONF_CONTENTS_TMP_DIR = """ -# Local directory for localized files and Cromwell's intermediate files -# If not defined, Caper will make .caper_tmp/ on local-out-dir or CWD. +# Local directory for localized files and Cromwell's intermediate files. +# If not defined then Caper will make .caper_tmp/ on `local-out-dir` or CWD. # /tmp is not recommended here since Caper store all localized data files # on this directory (e.g. input FASTQs defined as URLs in input JSON). local-loc-dir= """ CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP = """ -# This parameter is NOT for 'caper submit' BUT for 'caper run' and 'caper server' only. -# This resource parameter string will be passed to sbatch, qsub, bsub, ... -# You can customize it according to your cluster's configuration. - -# Note that Cromwell's implicit type conversion (String to Integer) -# seems to be buggy for WomLong type memory variables (memory_mb and memory_gb). -# So be careful about using the + operator between WomLong and other types (String, even Int). -# For example, ${"--mem=" + memory_mb} will not work since memory_mb is WomLong. -# Use ${"if defined(memory_mb) then "--mem=" else ""}{memory_mb}${"if defined(memory_mb) then "mb " else " "} -# See https://github.com/broadinstitute/cromwell/issues/4659 for details - -# Cromwell's built-in variables (attributes defined in WDL task's runtime) -# Use them within ${} notation. -# - cpu: number of cores for a job (default = 1) -# - memory_mb, memory_gb: total memory for a job in MB, GB -# * these are converted from 'memory' string attribute (including size unit) -# defined in WDL task's runtime -# - time: time limit for a job in hour -# - gpu: specified gpu name or number of gpus (it's declared as String) +# This parameter is for HPC backends only (slurm, sge, pbs, lsf). +# It is not recommended to change it unless your cluster has custom resource settings. +# See https://github.com/ENCODE-DCC/caper/blob/master/docs/resource_param.md for details. """ CONF_CONTENTS_SLURM_PARAM = """ {help_context} slurm-resource-param={slurm_resource_param} +# This parameter is used for `caper hpc submit` command only. +slurm-leader-job-resource-param={slurm_leader_job_resource_param} + # If needed uncomment and define any extra SLURM sbatch parameters here -# YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE +# (YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE) #slurm-extra-param= """.format( help_context=CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP, slurm_resource_param=CromwellBackendSlurm.DEFAULT_SLURM_RESOURCE_PARAM, + slurm_leader_job_resource_param=' '.join(SlurmWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM), ) CONF_CONTENTS_SGE_PARAM = """ {help_context} +sge-resource-param={sge_resource_param} + +# This parameter is used for `caper hpc submit` command only. +sge-leader-job-resource-param={sge_leader_job_resource_param} + # Parallel environment of SGE: # Find one with `$ qconf -spl` or ask you admin to add one if not exists. # If your cluster works without PE then edit the below sge-resource-param sge-pe= -sge-resource-param={sge_resource_param} # If needed uncomment and define any extra SGE qsub parameters here -# YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE +# (YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE) #sge-extra-param= """.format( help_context=CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP, sge_resource_param=CromwellBackendSge.DEFAULT_SGE_RESOURCE_PARAM, + sge_leader_job_resource_param=' '.join(SgeWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM), ) CONF_CONTENTS_PBS_PARAM = """ {help_context} pbs-resource-param={pbs_resource_param} +# This parameter is used for `caper hpc submit` command only. +pbs-leader-job-resource-param={pbs_leader_job_resource_param} + # If needed uncomment and define any extra PBS qsub parameters here -# YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE +# (YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE) #pbs-extra-param= """.format( help_context=CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP, pbs_resource_param=CromwellBackendPbs.DEFAULT_PBS_RESOURCE_PARAM, + pbs_leader_job_resource_param=' '.join(PbsWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM), ) CONF_CONTENTS_LSF_PARAM = """ {help_context} lsf-resource-param={lsf_resource_param} +# This parameter is used for `caper hpc submit` command only. +lsf-leader-job-resource-param={lsf_leader_job_resource_param} + # If needed uncomment and define any extra LSF bsub parameters here -# YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE +# (YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE) #lsf-extra-param= """.format( help_context=CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP, lsf_resource_param=CromwellBackendLsf.DEFAULT_LSF_RESOURCE_PARAM, + lsf_leader_job_resource_param=' '.join(LsfWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM), ) DEFAULT_CONF_CONTENTS_LOCAL = ( """ backend=local """ - + CONF_CONTENTS_LOCAL_HASH_STRAT - + CONF_CONTENTS_DB + CONF_CONTENTS_TMP_DIR ) @@ -163,8 +131,6 @@ # ==================================================================== """ + CONF_CONTENTS_SLURM_PARAM - + CONF_CONTENTS_LOCAL_HASH_STRAT - + CONF_CONTENTS_DB + CONF_CONTENTS_TMP_DIR ) @@ -186,8 +152,6 @@ """ + CONF_CONTENTS_SLURM_PARAM - + CONF_CONTENTS_LOCAL_HASH_STRAT - + CONF_CONTENTS_DB + CONF_CONTENTS_TMP_DIR ) @@ -204,8 +168,6 @@ slurm-account= """ + CONF_CONTENTS_SLURM_PARAM - + CONF_CONTENTS_LOCAL_HASH_STRAT - + CONF_CONTENTS_DB + CONF_CONTENTS_TMP_DIR ) @@ -218,8 +180,6 @@ # to fit your cluster's configuration. """ + CONF_CONTENTS_SGE_PARAM - + CONF_CONTENTS_LOCAL_HASH_STRAT - + CONF_CONTENTS_DB + CONF_CONTENTS_TMP_DIR ) @@ -228,8 +188,6 @@ backend=pbs """ + CONF_CONTENTS_PBS_PARAM - + CONF_CONTENTS_LOCAL_HASH_STRAT - + CONF_CONTENTS_DB + CONF_CONTENTS_TMP_DIR ) @@ -238,8 +196,6 @@ backend=lsf """ + CONF_CONTENTS_LSF_PARAM - + CONF_CONTENTS_LOCAL_HASH_STRAT - + CONF_CONTENTS_DB + CONF_CONTENTS_TMP_DIR ) @@ -312,10 +268,6 @@ def init_caper_conf(conf_file, backend): """ if backend in (BACKEND_LOCAL, BACKEND_ALIAS_LOCAL): contents = DEFAULT_CONF_CONTENTS_LOCAL - elif backend == BACKEND_ALIAS_SHERLOCK: - contents = DEFAULT_CONF_CONTENTS_SHERLOCK - elif backend == BACKEND_ALIAS_SCG: - contents = DEFAULT_CONF_CONTENTS_SCG elif backend == BACKEND_SLURM: contents = DEFAULT_CONF_CONTENTS_SLURM elif backend == BACKEND_SGE: diff --git a/caper/cli.py b/caper/cli.py index 0802635c..7216e141 100644 --- a/caper/cli.py +++ b/caper/cli.py @@ -5,6 +5,7 @@ import logging import os import re +import subprocess import sys from autouri import GCSURI, AutoURI @@ -24,6 +25,8 @@ from .dict_tool import flatten_dict from .resource_analysis import LinearResourceAnalysis from .server_heartbeat import ServerHeartbeat +from .cli_hpc import subcmd_hpc + logger = logging.getLogger(__name__) @@ -320,6 +323,7 @@ def client(args): raise ValueError('Unsupported client action {act}'.format(act=args.action)) + def subcmd_server(caper_runner, args, nonblocking=False): """ Args: @@ -700,7 +704,8 @@ def main(args=None, nonblocking_server=False): if parsed_args.action == 'init': init_caper_conf(parsed_args.conf, parsed_args.platform) - + elif parsed_args.action in ('hpc'): + return subcmd_hpc(parsed_args) elif parsed_args.action in ('run', 'server'): return runner(parsed_args, nonblocking_server=nonblocking_server) else: diff --git a/caper/cli_hpc.py b/caper/cli_hpc.py new file mode 100644 index 00000000..5f67c9ad --- /dev/null +++ b/caper/cli_hpc.py @@ -0,0 +1,81 @@ +import logging +import sys + +from .hpc import (SlurmWrapper, SgeWrapper, PbsWrapper, LsfWrapper) + +logger = logging.getLogger(__name__) + + +def make_caper_run_command_for_hpc_submit(): + """Makes `caper run ...` command from `caper hpc submit` command by simply + replacing `caper hpc submit` with `caper run`. + This also escapes double quotes in caper run command. + """ + if sys.argv[1] == 'hpc' and sys.argv[2] == 'submit': + # Replace "caper hpc submit" with "caper run" + new_argv = list(sys.argv) + new_argv.pop(2) + new_argv[1] = 'run' + return new_argv + else: + raise ValueError('Wrong HPC command') + + +def subcmd_hpc(args): + if args.hpc_action == 'submit': + + if args.leader_job_name is None: + raise ValueError( + 'Define --leader-job-name [LEADER_JOB_NAME] in the command line arguments.' + ) + caper_run_command = make_caper_run_command_for_hpc_submit() + + if args.backend == 'slurm': + stdout = SlurmWrapper( + args.slurm_leader_job_resource_param.split(), + args.slurm_partition, + args.slurm_account + ).submit(args.leader_job_name, caper_run_command) + + elif args.backend == 'sge': + stdout = SgeWrapper( + args.sge_leader_job_resource_param.split(), + args.sge_queue + ).submit(args.leader_job_name, caper_run_command) + + elif args.backend == 'pbs': + stdout = PbsWrapper( + args.pbs_leader_job_resource_param.split(), + args.pbs_queue + ).submit(args.leader_job_name, caper_run_command) + + elif args.backend == 'lsf': + stdout = LsfWrapper( + args.lsf_leader_job_resource_param.split(), + args.lsf_queue + ).submit(args.leader_job_name, caper_run_command) + + else: + raise ValueError('Unsupported backend {b} for hpc'.format(b=args.backend)) + else: + if args.backend == 'slurm': + hpc_wrapper = SlurmWrapper() + elif args.backend == 'sge': + hpc_wrapper = SgeWrapper() + elif args.backend == 'pbs': + hpc_wrapper = PbsWrapper() + elif args.backend == 'lsf': + hpc_wrapper = LsfWrapper() + else: + raise ValueError('Unsupported backend {b} for hpc'.format(b=args.backend)) + + if args.hpc_action == 'list': + stdout = hpc_wrapper.list() + + elif args.hpc_action == 'abort': + stdout = hpc_wrapper.abort(args.job_ids) + + else: + raise ValueError('Unsupported hpc action {act}'.format(act=args.hpc_action)) + + print(stdout) diff --git a/caper/cromwell.py b/caper/cromwell.py index 84eab5b0..e495b551 100644 --- a/caper/cromwell.py +++ b/caper/cromwell.py @@ -185,7 +185,7 @@ def run( You can simply get it by thread.returnvalue after thread is done. Args: - inputs:. + inputs: input JSON file (-i). options: workflow options JSON file (-o). diff --git a/caper/hpc.py b/caper/hpc.py new file mode 100644 index 00000000..8783d387 --- /dev/null +++ b/caper/hpc.py @@ -0,0 +1,222 @@ +"""Caper's HPC Wrapper based on job engine's CLI (shell command). +e.g. sbatch, squeue, qsub, qstat +""" +import logging +import os +import subprocess +from abc import ABC, abstractmethod +from collections import namedtuple +from pathlib import Path +from tempfile import NamedTemporaryFile + +logger = logging.getLogger(__name__) + +CAPER_LEADER_JOB_NAME_PREFIX = 'CAPER_' +ILLEGAL_CHARS_IN_JOB_NAME = [',', ' ', '\t'] + + +def get_user_from_os_environ(): + return os.environ['USER'] + +def make_bash_script_contents(contents): + return f'#!/bin/bash\n{contents}\n' + +def make_caper_leader_job_name(job_name): + """Check if job name contains Comma, TAB or whitespace. + They are not allowed since they can be used as separators. + """ + if any(illegal_char in job_name for illegal_char in ILLEGAL_CHARS_IN_JOB_NAME): + raise ValueError('Illegal character {chr} in job name {job}'.format( + chr=illegal_chr, job=job_name + )) + return CAPER_LEADER_JOB_NAME_PREFIX + job_name + + +class HpcWrapper(ABC): + def __init__( + self, + leader_job_resource_param=[], + ): + """Base class for HPC job engines. + + Args: + leader_job_resource_param: + List of command line parameters to be passed to + subprocess.Popen(leader_job_resource_param, shell=False, ...). + """ + self.leader_job_resource_param = leader_job_resource_param + + def submit(self, job_name, caper_run_command): + """Submits a caper leader job to HPC (e.g. sbatch, qsub). + Such leader job will be prefixed with CAPER_LEADER_JOB_NAME_PREFIX. + + Returns output STDOUT from submission command. + """ + home_dir = f'{str(Path.home())}{os.sep}' + with NamedTemporaryFile(prefix=home_dir, suffix='.sh') as shell_script: + shell_script.write(make_bash_script_contents(caper_run_command).encode()) + shell_script.flush() + + return self._submit(job_name, shell_script.name) + + def list(self): + """Filters out non-caper jobs from the job list keeping the first line (header). + And then returns output STDOUT. + """ + result = [] + lines = self._list().split('\n') + + # keep header + result.append(lines[0]) + + # filter out non-caper lines + for line in lines[1:]: + if CAPER_LEADER_JOB_NAME_PREFIX in line: + lines.append(line) + + return '\n'.join(lines) + + def abort(self, job_ids): + """Returns output STDOUT from job engine's abort command (e.g. scancel, qdel). + """ + return self._abort(job_ids) + + @abstractmethod + def _submit(self, job_name, shell_script): + pass + + def _list(self): + pass + + @abstractmethod + def _abort(self, job_ids): + pass + + def _run_command(self, command): + """Runs a shell command line and returns STDOUT. + """ + logger.info(f'Running shell command: {" ".join(command)}') + return subprocess.run( + command, + stdout=subprocess.PIPE, + env=os.environ, + ).stdout.decode().strip() + + +class SlurmWrapper(HpcWrapper): + DEFAULT_LEADER_JOB_RESOURCE_PARAM = ['-t', '48:00:00', '--mem', '4G'] + + def __init__( + self, + leader_job_resource_param=DEFAULT_LEADER_JOB_RESOURCE_PARAM, + slurm_partition=None, + slurm_account=None, + ): + super().__init__( + leader_job_resource_param=leader_job_resource_param, + ) + slurm_partition_param = ['-p', slurm_partition] if slurm_partition else [] + slurm_account_param = ['-A', slurm_account] if slurm_account else [] + self.slurm_extra_param = slurm_partition_param + slurm_account_param + + def _submit(self, job_name, shell_script): + command = ['sbatch'] + self.leader_job_resource_param + self.slurm_extra_param + [ + '--export=ALL', '-J', make_caper_leader_job_name(job_name), + shell_script, + ] + return self._run_command(command) + + def _list(self): + return self._run_command([ + 'squeue', '-u', get_user_from_os_environ(), '--Format=JobID,Name,SubmitTime' + ]) + + def _abort(self, job_ids): + return self._run_command(['scancel', '--signal=SIGTERM', '-j'] + job_ids) + + +class SgeWrapper(HpcWrapper): + DEFAULT_LEADER_JOB_RESOURCE_PARAM = ['-l', 'h_rt=48:00:00,h_vmem=4G'] + + def __init__( + self, + leader_job_resource_param=DEFAULT_LEADER_JOB_RESOURCE_PARAM, + sge_queue=None, + ): + super().__init__( + leader_job_resource_param=leader_job_resource_param, + ) + self.sge_queue_param = ['-q', sge_queue] if sge_queue else [] + + def _submit(self, job_name, shell_script): + command = ['qsub'] + self.leader_job_resource_param + self.sge_queue_param + [ + '-V', '-terse', '-N', make_caper_leader_job_name(job_name), + shell_script + ] + return self._run_command(command) + + def _list(self): + return self._run_command([ + 'qstat', '-u', get_user_from_os_environ() + ]) + + def abort(self, job_ids): + return self._run_command(['qdel'] + job_ids) + + +class PbsWrapper(HpcWrapper): + DEFAULT_LEADER_JOB_RESOURCE_PARAM = ['-l', 'walltime=48:00:00,mem=4gb'] + + def __init__( + self, + leader_job_resource_param=DEFAULT_LEADER_JOB_RESOURCE_PARAM, + pbs_queue=None, + ): + super().__init__( + leader_job_resource_param=leader_job_resource_param, + ) + self.pbs_queue_param = ['-q', pbs_queue] if pbs_queue else [] + + def _submit(self, job_name, shell_script): + command = ['qsub'] + self.leader_job_resource_param + self.pbs_queue_param + [ + '-V', '-N', make_caper_leader_job_name(job_name), + shell_script + ] + return self._run_command(command) + + def _list(self): + return self._run_command([ + 'qstat', '-u', get_user_from_os_environ() + ]) + + def abort(self, job_ids): + return self._run_command(['qdel'] + job_ids) + + +class LsfWrapper(HpcWrapper): + DEFAULT_LEADER_JOB_RESOURCE_PARAM = ['-W', '2880', '-M', '4g'] + + def __init__( + self, + leader_job_resource_param=DEFAULT_LEADER_JOB_RESOURCE_PARAM, + lsf_queue=None, + ): + super().__init__( + leader_job_resource_param=leader_job_resource_param, + ) + self.lsf_queue_param = ['-q', lsf_queue] if lsf_queue else [] + + def _submit(self, job_name, shell_script): + command = ['bsub'] + self.leader_job_resource_param + self.lsf_queue_param + [ + '-env', 'all', '-J', make_caper_leader_job_name(job_name), + shell_script + ] + return self._run_command(command) + + def _list(self): + return self._run_command([ + 'bjobs', '-u', get_user_from_os_environ() + ]) + + def abort(self, job_ids): + return self._run_command(['bkill'] + job_ids) From 3ba5041ae92939183c0f533e64f97a3a73c869b2 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Tue, 31 May 2022 14:40:51 -0700 Subject: [PATCH 03/19] make a separate doc for resource-params --- docs/resource_param.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 docs/resource_param.md diff --git a/docs/resource_param.md b/docs/resource_param.md new file mode 100644 index 00000000..146ffaf9 --- /dev/null +++ b/docs/resource_param.md @@ -0,0 +1,12 @@ +# Resource parameters for HPC backends (slurm, sge, pbs, lsf) + +Note that Cromwell's implicit type conversion (`String` to `Integer`) seems to be buggy for WomLong type memory variables (`memory_mb` and `memory_gb`). So be careful about using the `+` operator between `WomLong` and other types (`String`, even `Int`). See https://github.com/broadinstitute/cromwell/issues/4659 + +For example, `${"--mem=" + memory_mb}` will not work since `memory_mb` is `WomLong`. Use `${"if defined(memory_mb) then "--mem=" else ""}{memory_mb}${"if defined(memory_mb) then "mb " else " "}`. + +You can use Cromwell's built-in variables (attributes defined in WDL task's runtime) within Cromwell's `${}` notation. +- `cpu`: Number of cores for a job (default = 1) +- `memory_mb`, `memory_gb`: Total memory for a job in MB or GB. These are converted from 'memory' string attribute (including size unit) + defined in WDL task's runtime +- `time`: Time limit for a job in hour +- `gpu`: Specified gpu name or number of gpus (it's declared as String) From e8b2f1005c394d265d0a21278f0cfb444daf2d20 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Tue, 7 Jun 2022 10:12:51 -0700 Subject: [PATCH 04/19] fix job canceling issue --- caper/cromwell_backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/caper/cromwell_backend.py b/caper/cromwell_backend.py index 39da19fe..fd063e88 100644 --- a/caper/cromwell_backend.py +++ b/caper/cromwell_backend.py @@ -625,7 +625,6 @@ class CromwellBackendLocal(CromwellBackendBase): 'caching': {'check-sibling-md5': True}, } }, - 'run-in-background': True, 'runtime-attributes': RUNTIME_ATTRIBUTES + RUNTIME_ATTRIBUTES_DOCKER, 'submit': SUBMIT, 'submit-docker': SUBMIT_DOCKER, From a273cb1e14370d78b35feb5db1da74a5edf25158 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Tue, 7 Jun 2022 10:19:34 -0700 Subject: [PATCH 05/19] proper error handling (java not found) --- caper/cromwell.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/caper/cromwell.py b/caper/cromwell.py index e495b551..7b0c5cb6 100644 --- a/caper/cromwell.py +++ b/caper/cromwell.py @@ -146,14 +146,20 @@ def on_stderr(s): nonlocal stderr stderr += s - th = NBSubprocThread(cmd, cwd=tmp_d, on_stderr=on_stderr, quiet=True) + th = NBSubprocThread(cmd, cwd=tmp_d, on_stderr=on_stderr, quiet=False) th.start() th.join() if th.returncode: - raise WomtoolValidationFailed( - 'RC={rc}\nSTDERR={stderr}'.format(rc=th.returncode, stderr=stderr) - ) + if th.returncode == 127: + raise FileNotFoundError( + 'Java executable not found on your system? ' + 'Please install Java and try again.' + ) + else: + raise WomtoolValidationFailed( + 'RC={rc}\nSTDERR={stderr}'.format(rc=th.returncode, stderr=stderr) + ) logger.info('Passed Womtool validation.') From 90782510b8cafbb82c9d4f5195a1e9510d5dc222 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Tue, 7 Jun 2022 10:20:41 -0700 Subject: [PATCH 06/19] proper error handling --- caper/nb_subproc_thread.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/caper/nb_subproc_thread.py b/caper/nb_subproc_thread.py index 21033781..db26e21f 100644 --- a/caper/nb_subproc_thread.py +++ b/caper/nb_subproc_thread.py @@ -1,9 +1,10 @@ import logging import time -from signal import SIGTERM +import signal from subprocess import PIPE, Popen from threading import Thread + logger = logging.getLogger(__name__) @@ -14,7 +15,7 @@ def is_fileobj_open(fileobj): class NBSubprocThread(Thread): DEFAULT_POLL_INTERVAL_SEC = 0.01 DEFAULT_SUBPROCESS_NAME = 'Subprocess' - DEFAULT_STOP_SIGNAL = SIGTERM + DEFAULT_STOP_SIGNAL = signal.SIGINT def __init__( self, @@ -141,7 +142,7 @@ def stop(self, stop_signal=DEFAULT_STOP_SIGNAL, wait=False): if wait: if self._returncode is None: logger.info( - '{name} stopped but waiting for graceful shutdown...'.format( + '{name}: waiting for a graceful shutdown...'.format( name=self._subprocess_name ) ) @@ -162,7 +163,6 @@ def _popen( ): """Wrapper for subprocess.Popen(). """ - def read_stdout(stdout_bytes): text = stdout_bytes.decode() if text: @@ -212,16 +212,19 @@ def read_from_stderr_obj(stderr): if p.poll() is not None: self._returncode = p.poll() break + if self._stop_it and self._stop_signal: p.send_signal(self._stop_signal) + self._returncode = p.returncode break time.sleep(self._poll_interval) except Exception as e: if not self._quiet: logger.error(e, exc_info=True) + self._returncode = 127 - finally: + else: stdout_bytes, stderr_bytes = p.communicate() read_stdout(stdout_bytes) read_stderr(stderr_bytes) From 23baa994ffad36a973e63531a90c11a9ba770fb0 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Tue, 7 Jun 2022 10:21:18 -0700 Subject: [PATCH 07/19] fix scancel signal issue (scancel --full) --- caper/hpc.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/caper/hpc.py b/caper/hpc.py index 8783d387..2130b4d1 100644 --- a/caper/hpc.py +++ b/caper/hpc.py @@ -37,12 +37,7 @@ def __init__( self, leader_job_resource_param=[], ): - """Base class for HPC job engines. - - Args: - leader_job_resource_param: - List of command line parameters to be passed to - subprocess.Popen(leader_job_resource_param, shell=False, ...). + """Base class for HPC job engine wrapper. """ self.leader_job_resource_param = leader_job_resource_param @@ -54,7 +49,8 @@ def submit(self, job_name, caper_run_command): """ home_dir = f'{str(Path.home())}{os.sep}' with NamedTemporaryFile(prefix=home_dir, suffix='.sh') as shell_script: - shell_script.write(make_bash_script_contents(caper_run_command).encode()) + contents = make_bash_script_contents(' '.join(caper_run_command)) + shell_script.write(contents.encode()) shell_script.flush() return self._submit(job_name, shell_script.name) @@ -70,11 +66,12 @@ def list(self): result.append(lines[0]) # filter out non-caper lines + logger.info('Filtering out non-caper leader jobs...') for line in lines[1:]: if CAPER_LEADER_JOB_NAME_PREFIX in line: - lines.append(line) + result.append(line) - return '\n'.join(lines) + return '\n'.join(result) def abort(self, job_ids): """Returns output STDOUT from job engine's abort command (e.g. scancel, qdel). @@ -90,6 +87,8 @@ def _list(self): @abstractmethod def _abort(self, job_ids): + """Sends SIGINT to Caper for a graceful shutdown. + """ pass def _run_command(self, command): @@ -128,11 +127,13 @@ def _submit(self, job_name, shell_script): def _list(self): return self._run_command([ - 'squeue', '-u', get_user_from_os_environ(), '--Format=JobID,Name,SubmitTime' + 'squeue', '-u', get_user_from_os_environ(), '--Format=JobID,Name,State,SubmitTime' ]) def _abort(self, job_ids): - return self._run_command(['scancel', '--signal=SIGTERM', '-j'] + job_ids) + """Notes: --full is necessary to correctly send SIGINT to the leader job (Cromwell process). + """ + return self._run_command(['scancel', '--full', '--signal=SIGINT'] + job_ids) class SgeWrapper(HpcWrapper): From 1874235f99c960d35268c256339e635fd35ee26b Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Tue, 7 Jun 2022 12:35:12 -0700 Subject: [PATCH 08/19] bump ver --- caper/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caper/__init__.py b/caper/__init__.py index 1c6c2639..c45eff1e 100644 --- a/caper/__init__.py +++ b/caper/__init__.py @@ -2,4 +2,4 @@ from .caper_runner import CaperRunner __all__ = ['CaperClient', 'CaperClientSubmit', 'CaperRunner'] -__version__ = '2.1.3' +__version__ = '2.2.0' From 5b75a4c70bb3dfaa1607055c3928981bb1aedf6a Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Wed, 8 Jun 2022 10:12:03 -0700 Subject: [PATCH 09/19] change stop signal for subprocess (sigint -> sigterm) --- caper/nb_subproc_thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caper/nb_subproc_thread.py b/caper/nb_subproc_thread.py index db26e21f..7d5e93e3 100644 --- a/caper/nb_subproc_thread.py +++ b/caper/nb_subproc_thread.py @@ -15,7 +15,7 @@ def is_fileobj_open(fileobj): class NBSubprocThread(Thread): DEFAULT_POLL_INTERVAL_SEC = 0.01 DEFAULT_SUBPROCESS_NAME = 'Subprocess' - DEFAULT_STOP_SIGNAL = signal.SIGINT + DEFAULT_STOP_SIGNAL = signal.SIGTERM def __init__( self, From c00145fdf991fc751273f11c9013b678304bd1a6 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Wed, 8 Jun 2022 10:39:29 -0700 Subject: [PATCH 10/19] better instruction in caper's conf --- caper/caper_init.py | 132 +++++++++++--------------------------------- 1 file changed, 31 insertions(+), 101 deletions(-) diff --git a/caper/caper_init.py b/caper/caper_init.py index 7ea14a1a..37c151ec 100644 --- a/caper/caper_init.py +++ b/caper/caper_init.py @@ -26,28 +26,22 @@ CONF_CONTENTS_TMP_DIR = """ # Local directory for localized files and Cromwell's intermediate files. -# If not defined then Caper will make .caper_tmp/ on `local-out-dir` or CWD. -# /tmp is not recommended here since Caper store all localized data files -# on this directory (e.g. input FASTQs defined as URLs in input JSON). +# If not defined then Caper will make .caper_tmp/ on CWD or `local-out-dir`. +# /tmp is not recommended since Caper store localized data files here. local-loc-dir= """ CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP = """ -# This parameter is for HPC backends only (slurm, sge, pbs, lsf). +# This parameter defines resource parameters for submitting WDL task to job engine. +# It is for HPC backends only (slurm, sge, pbs and lsf). # It is not recommended to change it unless your cluster has custom resource settings. -# See https://github.com/ENCODE-DCC/caper/blob/master/docs/resource_param.md for details. -""" +# See https://github.com/ENCODE-DCC/caper/blob/master/docs/resource_param.md for details.""" CONF_CONTENTS_SLURM_PARAM = """ +# This parameter defines resource parameters for Caper's leader job only. +slurm-leader-job-resource-param={slurm_leader_job_resource_param} {help_context} slurm-resource-param={slurm_resource_param} - -# This parameter is used for `caper hpc submit` command only. -slurm-leader-job-resource-param={slurm_leader_job_resource_param} - -# If needed uncomment and define any extra SLURM sbatch parameters here -# (YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE) -#slurm-extra-param= """.format( help_context=CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP, slurm_resource_param=CromwellBackendSlurm.DEFAULT_SLURM_RESOURCE_PARAM, @@ -55,20 +49,15 @@ ) CONF_CONTENTS_SGE_PARAM = """ -{help_context} -sge-resource-param={sge_resource_param} - -# This parameter is used for `caper hpc submit` command only. +# This parameter defines resource parameters for Caper's leader job only. sge-leader-job-resource-param={sge_leader_job_resource_param} # Parallel environment of SGE: # Find one with `$ qconf -spl` or ask you admin to add one if not exists. # If your cluster works without PE then edit the below sge-resource-param sge-pe= - -# If needed uncomment and define any extra SGE qsub parameters here -# (YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE) -#sge-extra-param= +{help_context} +sge-resource-param={sge_resource_param} """.format( help_context=CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP, sge_resource_param=CromwellBackendSge.DEFAULT_SGE_RESOURCE_PARAM, @@ -76,15 +65,10 @@ ) CONF_CONTENTS_PBS_PARAM = """ +# This parameter defines resource parameters for Caper's leader job only. +pbs-leader-job-resource-param={pbs_leader_job_resource_param} {help_context} pbs-resource-param={pbs_resource_param} - -# This parameter is used for `caper hpc submit` command only. -pbs-leader-job-resource-param={pbs_leader_job_resource_param} - -# If needed uncomment and define any extra PBS qsub parameters here -# (YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE) -#pbs-extra-param= """.format( help_context=CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP, pbs_resource_param=CromwellBackendPbs.DEFAULT_PBS_RESOURCE_PARAM, @@ -92,15 +76,10 @@ ) CONF_CONTENTS_LSF_PARAM = """ +# This parameter defines resource parameters for Caper's leader job only. +lsf-leader-job-resource-param={lsf_leader_job_resource_param} {help_context} lsf-resource-param={lsf_resource_param} - -# This parameter is used for `caper hpc submit` command only. -lsf-leader-job-resource-param={lsf_leader_job_resource_param} - -# If needed uncomment and define any extra LSF bsub parameters here -# (YOU CANNOT USE WDL SYNTAX AND CROMWELL BUILT-IN VARIABLES HERE) -#lsf-extra-param= """.format( help_context=CONF_CONTENTS_COMMON_RESOURCE_PARAM_HELP, lsf_resource_param=CromwellBackendLsf.DEFAULT_LSF_RESOURCE_PARAM, @@ -108,100 +87,54 @@ ) DEFAULT_CONF_CONTENTS_LOCAL = ( - """ -backend=local + """backend=local """ + CONF_CONTENTS_TMP_DIR ) -DEFAULT_CONF_CONTENTS_SHERLOCK = ( - """ -backend=slurm +DEFAULT_CONF_CONTENTS_SLURM = ( + """backend=slurm -# SLURM partition. Define only if required by a cluster. You must define it for Stanford Sherlock. +# SLURM partition. DEFINE ONLY IF REQUIRED BY YOUR CLUSTER'S POLICY. +# You must define it for Stanford Sherlock. slurm-partition= -# IMPORTANT warning for Stanford Sherlock cluster -# ==================================================================== -# DO NOT install any codes/executables/Miniconda -# (java, conda, python, caper, pipeline's WDL, pipeline's Conda env, ...) on $SCRATCH or $OAK. -# You will see Segmentation Fault errors. -# Install all executables on $HOME or $PI_HOME instead. -# It's STILL OKAY to read input data from and write outputs to $SCRATCH or $OAK. -# ==================================================================== -""" - + CONF_CONTENTS_SLURM_PARAM - + CONF_CONTENTS_TMP_DIR -) - -DEFAULT_CONF_CONTENTS_SCG = ( - """ -backend=slurm - -# SLURM account. Define only if required by a cluster. You must define it for Stanford SCG. +# SLURM account. DEFINE ONLY IF REQUIRED BY YOUR CLUSTER'S POLICY. +# You must define it for Stanford SCG. slurm-account= - -# IMPORTANT warning for Stanford SCG cluster -# ==================================================================== -# DO NOT install any codes/executables/Miniconda -# (java, conda, python, caper, pipeline's WDL, pipeline's Conda env, ...) on your home (/home/$USER). -# Pipelines will get stuck due to slow filesystem. -# ALSO DO NOT USE /local/scratch to run pipelines. This directory is not static. -# Use $OAK storage to run pipelines, and to store codes/WDLs/executables. -# ==================================================================== - """ - + CONF_CONTENTS_SLURM_PARAM + CONF_CONTENTS_TMP_DIR -) - -DEFAULT_CONF_CONTENTS_SLURM = ( - """ -backend=slurm - -# define one of the followings (or both) according to your -# cluster's SLURM configuration. - -# SLURM partition. Define only if required by a cluster. You must define it for Stanford Sherlock. -slurm-partition= -# SLURM account. Define only if required by a cluster. You must define it for Stanford SCG. -slurm-account= -""" + CONF_CONTENTS_SLURM_PARAM - + CONF_CONTENTS_TMP_DIR ) DEFAULT_CONF_CONTENTS_SGE = ( - """ -backend=sge + """backend=sge # Parallel environement is required, ask your administrator to create one # If your cluster doesn't support PE then edit 'sge-resource-param' # to fit your cluster's configuration. """ - + CONF_CONTENTS_SGE_PARAM + CONF_CONTENTS_TMP_DIR + + CONF_CONTENTS_SGE_PARAM ) DEFAULT_CONF_CONTENTS_PBS = ( - """ -backend=pbs + """backend=pbs """ - + CONF_CONTENTS_PBS_PARAM + CONF_CONTENTS_TMP_DIR + + CONF_CONTENTS_PBS_PARAM ) DEFAULT_CONF_CONTENTS_LSF = ( - """ -backend=lsf + """backend=lsf """ - + CONF_CONTENTS_LSF_PARAM + CONF_CONTENTS_TMP_DIR + + CONF_CONTENTS_LSF_PARAM ) DEFAULT_CONF_CONTENTS_AWS = ( - """ -backend=aws + """backend=aws + # ARN for AWS Batch. aws-batch-arn= # AWS region (e.g. us-west-1) @@ -221,8 +154,8 @@ ) DEFAULT_CONF_CONTENTS_GCP = ( - """ -backend=gcp + """backend=gcp + # Google Cloud Platform Project gcp-prj= # Output bucket path for Google Cloud Platform. This should start with `gs://`. @@ -249,9 +182,6 @@ # e.g. us-west1-a,us-west1-b,us-west1-c gcp-zones= -# Increase instance's memory when retrying upon OOM (out of memory) error. -gcp-memory-retry-multiplier=1.2 - # Number of retrials. This parameter also applies to non-OOM failures. max-retries=1 """ From dc01cb8cffaaebcfb3e4b00ab949fe704bdec1a5 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Wed, 8 Jun 2022 10:46:36 -0700 Subject: [PATCH 11/19] update docs --- DETAILS.md | 26 +++++++++++---- README.md | 97 ++++++++++++++++++++---------------------------------- 2 files changed, 56 insertions(+), 67 deletions(-) diff --git a/DETAILS.md b/DETAILS.md index 48fc3e5b..4bc2b094 100644 --- a/DETAILS.md +++ b/DETAILS.md @@ -40,13 +40,14 @@ unhold | WF_ID or STR_LABEL |Release hold of workflows on a Cromwell server list | WF_ID or STR_LABEL |List submitted workflows on a Cromwell server metadata | WF_ID or STR_LABEL |Retrieve metadata JSONs for workflows debug, troubleshoot | WF_ID, STR_LABEL or
METADATA_JSON_FILE |Analyze reason for errors +hpc submit| WDL | Submit a Caper leader job to HPC's job engine +hpc list| | List all Caper leader jobs +hpc abort | JOB_ID | Abort a Caper leader job. This will cascade kill all child jobs. * `init`: To initialize Caper on a given platform. This command also downloads Cromwell/Womtool JARs so that Caper can work completely offline with local data files. **Platform**|**Description** :--------|:----- - sherlock | Stanford Sherlock cluster (SLURM) - scg | Stanford SCG cluster (SLURM) gcp | Google Cloud Platform aws | Amazon Web Service local | General local computer @@ -458,7 +459,6 @@ Example: ``` - ## How to override Caper's built-in backend If Caper's built-in backends don't work as expected on your clusters (e.g. due to different resource settings), then you can override built-in backends with your own configuration file (e.g. `your.backend.conf`). Caper generates a `backend.conf` for built-in backends on a temporary directory. @@ -808,9 +808,6 @@ This file DB is genereted on your working directory by default. Its default file Unless you explicitly define `file-db` in your configuration file `~/.caper/default.conf` this file DB name will depend on your input JSON filename. Therefore, you can simply resume a failed workflow with the same command line used for starting a new pipeline. - - - ## Profiling/monitoring resources on Google Cloud A workflow ran with Caper>=1.2.0 on `gcp` backend has a monitoring log (`monitoring.log`) by default on each task's execution directory. This log file includes useful resources data on an instance like used memory, used disk space and total cpu percentage. @@ -833,3 +830,20 @@ Define task's input file variables to limit analysis on specific tasks and input Example plots: - ENCODE ATAC-seq pipeline: [Plot PDF](https://storage.googleapis.com/caper-data/gcp_resource_analysis/example_plot/atac.pdf) + + +## Singularity and Docker Hub pull limit + +If you provide a Singularity image based on docker `docker://` then Caper will locally build a temporary Singularity image (`*.sif`) under `SINGULARITY_CACHEDIR` (defaulting to `~/.singularity/cache` if not defined). However, Singularity will blindly pull from DockerHub to quickly reach [a daily pull limit](https://www.docker.com/increase-rate-limits). It's recommended to use Singularity images from `shub://` (Singularity Hub) or `library://` (Sylabs Cloud). + + + +## How to customize resource parameters for HPCs + +Each HPC backend (`slurm`, `sge`, `pbs` and `lsf`) has its own resource parameter. e.g. `slurm-resource-param`. Find it in Caper's configuration file (`~/.caper/default.conf`) and edit it. For example, the default resource parameter for SLURM looks like the following: +``` +slurm-resource-param=-n 1 --ntasks-per-node=1 --cpus-per-task=${cpu} ${if defined(memory_mb) then "--mem=" else ""}${memory_mb}${if defined(memory_mb) then "M" else ""} ${if defined(time) then "--time=" else ""}${time*60} ${if defined(gpu) then "--gres=gpu:" else ""}${gpu} +``` +This should be a one-liner with WDL syntax allowed in `${}` notation. i.e. Cromwell's built-in resource variables like `cpu`(number of cores for a task), `memory_mb`(total amount of memory for a task in MB), `time`(walltime for a task in hour) and `gpu`(name of gpu unit or number of gpus) in `${}`. See https://github.com/openwdl/wdl/blob/main/versions/1.0/SPEC.md for WDL syntax. This line will be formatted with actual resource values by Cromwell and then passed to the submission command such as `sbatch` and `qsub`. + +Note that Cromwell's implicit type conversion (`WomLong` to `String`) seems to be buggy for `WomLong` type memory variables such as `memory_mb` and `memory_gb`. So be careful about using the `+` operator between `WomLong` and other types (`String`, even `Int`). For example, `${"--mem=" + memory_mb}` will not work since `memory_mb` is `WomLong` type. Use `${"if defined(memory_mb) then "--mem=" else ""}{memory_mb}${"if defined(memory_mb) then "mb " else " "}` instead. See https://github.com/broadinstitute/cromwell/issues/4659 for details. diff --git a/README.md b/README.md index d40ea90a..4bd96256 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,9 @@ [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![CircleCI](https://circleci.com/gh/ENCODE-DCC/caper.svg?style=svg)](https://circleci.com/gh/ENCODE-DCC/caper) -# Caper - -Caper (Cromwell Assisted Pipeline ExecutoR) is a wrapper Python package for [Cromwell](https://github.com/broadinstitute/cromwell/). ## Introduction -Caper wraps Cromwell to run pipelines on multiple platforms like GCP (Google Cloud Platform), AWS (Amazon Web Service) and HPCs like SLURM, SGE, PBS/Torque and LSF. It provides easier way of running Cromwell server/run modes by automatically composing necessary input files for Cromwell. Caper can run each task on a specified environment (Docker, Singularity or Conda). Also, Caper automatically localizes all files (keeping their directory structure) defined in your input JSON and command line according to the specified backend. For example, if your chosen backend is GCP and files in your input JSON are on S3 buckets (or even URLs) then Caper automatically transfers `s3://` and `http(s)://` files to a specified `gs://` bucket directory. Supported URIs are `s3://`, `gs://`, `http(s)://` and local absolute paths. You can use such URIs either in CLI and input JSON. Private URIs are also accessible if you authenticate using cloud platform CLIs like `gcloud auth`, `aws configure` and using `~/.netrc` for URLs. +Caper (Cromwell Assisted Pipeline ExecutoR) is a wrapper Python package for [Cromwell](https://github.com/broadinstitute/cromwell/). Caper wraps Cromwell to run pipelines on multiple platforms like GCP (Google Cloud Platform), AWS (Amazon Web Service) and HPCs like SLURM, SGE, PBS/Torque and LSF. It provides easier way of running Cromwell server/run modes by automatically composing necessary input files for Cromwell. Caper can run each task on a specified environment (Docker, Singularity or Conda). Also, Caper automatically localizes all files (keeping their directory structure) defined in your input JSON and command line according to the specified backend. For example, if your chosen backend is GCP and files in your input JSON are on S3 buckets (or even URLs) then Caper automatically transfers `s3://` and `http(s)://` files to a specified `gs://` bucket directory. Supported URIs are `s3://`, `gs://`, `http(s)://` and local absolute paths. You can use such URIs either in CLI and input JSON. Private URIs are also accessible if you authenticate using cloud platform CLIs like `gcloud auth`, `aws configure` and using `~/.netrc` for URLs. ## Installation for Google Cloud Platform and AWS @@ -19,16 +16,15 @@ See [this](scripts/gcp_caper_server/README.md) for details. See [this](scripts/aws_caper_server/README.md) for details. -## Installation +## Installation for local computers and HPCs 1) Make sure that you have Java (>= 11) and Python>=3.6 installed on your system and `pip` to install Caper. ```bash - $ pip install pip --upgrade $ pip install caper ``` -2) If you see an error message like `caper: command not found` then add the following line to the bottom of `~/.bashrc` and re-login. +2) If you see an error message like `caper: command not found` after installing then add the following line to the bottom of `~/.bashrc` and re-login. ```bash export PATH=$PATH:~/.local/bin @@ -38,19 +34,19 @@ See [this](scripts/aws_caper_server/README.md) for details. **Backend**|**Description** :--------|:----- - local | local computer without cluster engine. - slurm | SLURM cluster (e.g. Stanford Sherlock and SCG). - sge | Sun GridEngine cluster. - pbs | PBS cluster. - lsf | LSF cluster. + local | local computer without a cluster engine + slurm | SLURM (e.g. Stanford Sherlock and SCG) + sge | Sun GridEngine + pbs | PBS cluster + lsf | LSF cluster - > **IMPORTANT**: `sherlock` and `scg` backends have been deprecated. Use `slurm` backend instead and define `slurm-partition=` for Sherlock or `slurm-account=` in Caper's configuration file. + > **IMPORTANT**: `sherlock` and `scg` backends have been deprecated. Use `slurm` backend instead and following instruction comments in the configuration file. ```bash $ caper init [BACKEND] ``` -4) Edit `~/.caper/default.conf` and follow instructions in there. **DO NOT LEAVE ANY PARAMETERS UNDEFINED OR CAPER WILL NOT WORK CORRECTLY** +4) Edit `~/.caper/default.conf` and follow instructions in there. **CAREFULLY READ INSTRUCTION AND DO NOT LEAVE IMPORTANT PARAMETERS UNDEFINED OR CAPER WILL NOT WORK CORRECTLY** ## Docker, Singularity and Conda @@ -59,77 +55,56 @@ For local backends (`local`, `slurm`, `sge`, `pbs` and `lsf`), you can use `--do > **IMPORTANT**: Docker/singularity/conda defined in Caper's configuration file or in CLI (`--docker`, `--singularity` and `--conda`) will be overriden by those defined in WDL task's `runtime`. We provide these parameters to define default/base environment for a pipeline, not to override on WDL task's `runtime`. -For Conda users, make sure that you have installed pipeline's Conda environments before running pipelines. Caper only knows Conda environment's name. You don't need to activate any Conda environment before running a pipeline since Caper will internally run `conda run -n ENV_NAME COMMANDS` for each task. +For Conda users, make sure that you have installed pipeline's Conda environments before running pipelines. Caper only knows Conda environment's name. You don't need to activate any Conda environment before running a pipeline since Caper will internally run `conda run -n ENV_NAME TASK_SHELL_SCRIPT` for each task. Take a look at the following examples: ```bash -$ caper run test.wdl --docker # can be used as a flag too, Caper will find docker image from WDL if defined +$ caper run test.wdl --docker # can be used as a flag too, Caper will find a docker image defined in WDL $ caper run test.wdl --singularity docker://ubuntu:latest +$ caper hpc submit test.wdl --singularity --leader-job-name test1 # submit to job engine and use singularity defined in WDL $ caper submit test.wdl --conda your_conda_env_name # running caper server is required ``` -An environemnt defined here will be overriden by those defined in WDL task's `runtime`. Therefore, think of this as a base/default environment for your pipeline. You can define per-task environment in each WDL task's `runtime`. - -For cloud backends (`gcp` and `aws`), you always need to use `--docker` (can be skipped). Caper will automatically try to find a base docker image defined in your WDL. For other pipelines, define a base docker image in Caper's CLI or directly in each WDL task's `runtime`. - - -## Singularity and Docker Hub pull limit - -If you provide a Singularity image based on docker `docker://` then Caper will locally build a temporary Singularity image (`*.sif`) under `SINGULARITY_CACHEDIR` (defaulting to `~/.singularity/cache` if not defined). However, Singularity will blindly pull from DockerHub to quickly reach [a daily pull limit](https://www.docker.com/increase-rate-limits). It's recommended to use Singularity images from `shub://` (Singularity Hub) or `library://` (Sylabs Cloud). - -## Important notes for Conda users - -Since Caper>=2.0 you don't have to activate Conda environment before running pipelines. Caper will internally run `conda run -n ENV_NAME /bin/bash script.sh`. Just make sure that you correctly installed given pipeline's Conda environment(s). - - -## Important notes for Stanford HPC (Sherlock and SCG) users +An environemnt defined here will be overriden by those defined in WDL task's `runtime`. Therefore, think of this as a base/default environment for your pipeline. You can define per-task docker, singularity images to override those defined in Caper's command line. For example: +```wdl +task my_task { + ... + runtime { + docker: "ubuntu:latest" + singularity: "docker://ubuntu:latest" + } +} +``` -**DO NOT INSTALL CAPER, CONDA AND PIPELINE'S WDL ON `$SCRATCH` OR `$OAK` STORAGES**. You will see `Segmentation Fault` errors. Install these executables (Caper, Conda, WDL, ...) on `$HOME` OR `$PI_HOME`. You can still use `$OAK` for input data (e.g. FASTQs defined in your input JSON file) but not for outputs, which means that you should not run Caper on `$OAK`. `$SCRATCH` and `$PI_SCRATCH` are okay for both input and output data so run Caper on them. Running Croo to organize outputs into `$OAK` is okay. +For cloud backends (`gcp` and `aws`), Caper will automatically try to find a base docker image defined in your WDL. For other pipelines, define a base docker image in Caper's CLI or directly in each WDL task's `runtime`. ## Running pipelines on HPCs -Use `--singularity` or `--conda` in CLI to run a pipeline inside Singularity image or Conda environment. Most HPCs do not allow docker. For example, submit `caper run ... --singularity` as a leader job (with long walltime and not-very-big resources like 2 cpus and 5GB of RAM). Then Caper's leader job itself will submit its child jobs to the job engine so that both leader and child jobs can be found with `squeue` or `qstat`. +Use `--singularity` or `--conda` in CLI to run a pipeline inside Singularity image or Conda environment. Most HPCs do not allow docker. For example, `caper hpc submit ... --singularity` will submit Caper process to the job engine as a leader job. Then Caper's leader job will submit its child jobs to the job engine so that both leader and child jobs can be found with `squeue` or `qstat`. + +Use `caper hpc list` to list all leader jobs. Use `caper hpc abort JOB_ID` to abort a running leader job. **DO NOT DIRECTLY CANCEL A JOB USING CLUSTER COMMAND LIKE SCANCEL OR QDEL** then only your leader job will be canceled, not all the child jobs. Here are some example command lines to submit Caper as a leader job. Make sure that you correctly configured Caper with `caper init` and filled all parameters in the conf file `~/.caper/default.conf`. -There are extra parameters `--db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING]` to use call-caching (restarting workflows by re-using previous outputs). If you want to restart a failed workflow then use the same metadata DB path then pipeline will start from where it left off. It will actually start over but will reuse (soft-link) previous outputs. +There is an extra set of parameters `--db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING]` to use call-caching (restarting workflows by re-using previous outputs). If you want to restart a failed workflow then use the same metadata DB path then pipeline will start from where it left off. It will actually start over but will reuse (soft-link) previous outputs. ```bash -# make a separate directory for each workflow. +# make a new output directory for a workflow. $ cd [OUTPUT_DIR] -# Example for Stanford Sherlock -$ sbatch -p [SLURM_PARTITON] -J [WORKFLOW_NAME] --export=ALL --mem 5G -t 4-0 --wrap "caper run [WDL] -i [INPUT_JSON] --singularity --db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING]" +# Example for SLURM (e.gl Stanford Sherlock and SCG) and SGE +$ caper hpc submit [WDL] -i [INPUT_JSON] --singularity --db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING] -# Example for Stanford SCG -$ sbatch -A [SLURM_ACCOUNT] -J [WORKFLOW_NAME] --export=ALL --mem 5G -t 4-0 --wrap "caper run [WDL] -i [INPUT_JSON] --singularity --db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING]" +# Check status of leader jobs +$ caper hpc list -# Example for General SLURM cluster -$ sbatch -A [SLURM_ACCOUNT_IF_NEEDED] -p [SLURM_PARTITON_IF_NEEDED] -J [WORKFLOW_NAME] --export=ALL --mem 5G -t 4-0 --wrap "caper run [WDL] -i [INPUT_JSON] --singularity --db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING]" - -# Example for SGE -$ echo "caper run [WDL] -i [INPUT_JSON] --conda --db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING]" | qsub -V -N [JOB_NAME] -l h_rt=144:00:00 -l h_vmem=3G - -# Check status of leader job -$ squeue -u $USER | grep -v [WORKFLOW_NAME] - -# Kill the leader job then Caper will gracefully shutdown to kill its children. -$ scancel [LEADER_JOB_ID] -``` - - -## How to customize resource parameters for HPCs - -Each HPC backend (`slurm`, `sge`, `pbs` and `lsf`) has its own resource parameter. e.g. `slurm-resource-param`. Find it in Caper's configuration file (`~/.caper/default.conf`) and edit it. For example, the default resource parameter for SLURM looks like the following: -``` -slurm-resource-param=-n 1 --ntasks-per-node=1 --cpus-per-task=${cpu} ${if defined(memory_mb) then "--mem=" else ""}${memory_mb}${if defined(memory_mb) then "M" else ""} ${if defined(time) then "--time=" else ""}${time*60} ${if defined(gpu) then "--gres=gpu:" else ""}${gpu} +# Abort a leader job (this will cascade-kill all its child jobs) +$ caper hpc abort [JOB_ID] ``` -This should be a one-liner with WDL syntax allowed in `${}` notation. i.e. Cromwell's built-in resource variables like `cpu`(number of cores for a task), `memory_mb`(total amount of memory for a task in MB), `time`(walltime for a task in hour) and `gpu`(name of gpu unit or number of gpus) in `${}`. See https://github.com/openwdl/wdl/blob/main/versions/1.0/SPEC.md for WDL syntax. This line will be formatted with actual resource values by Cromwell and then passed to the submission command such as `sbatch` and `qsub`. - -Note that Cromwell's implicit type conversion (`WomLong` to `String`) seems to be buggy for `WomLong` type memory variables such as `memory_mb` and `memory_gb`. So be careful about using the `+` operator between `WomLong` and other types (`String`, even `Int`). For example, `${"--mem=" + memory_mb}` will not work since `memory_mb` is `WomLong` type. Use `${"if defined(memory_mb) then "--mem=" else ""}{memory_mb}${"if defined(memory_mb) then "mb " else " "}` instead. See https://github.com/broadinstitute/cromwell/issues/4659 for details. # DETAILS See [details](DETAILS.md). + From 23a82bdd7cdf27d713542561a020d2c062e399b0 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Wed, 8 Jun 2022 16:44:50 -0700 Subject: [PATCH 12/19] send sigterm instead of sigint --- caper/hpc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/caper/hpc.py b/caper/hpc.py index 2130b4d1..5c8c6f24 100644 --- a/caper/hpc.py +++ b/caper/hpc.py @@ -131,9 +131,9 @@ def _list(self): ]) def _abort(self, job_ids): - """Notes: --full is necessary to correctly send SIGINT to the leader job (Cromwell process). + """Notes: --full is necessary to correctly send SIGTERM to the leader job (Cromwell process). """ - return self._run_command(['scancel', '--full', '--signal=SIGINT'] + job_ids) + return self._run_command(['scancel', '--full', '--signal=SIGTERM'] + job_ids) class SgeWrapper(HpcWrapper): From 79ba6ed9906c93bb2e065705504cc267270d05a7 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Wed, 8 Jun 2022 16:57:07 -0700 Subject: [PATCH 13/19] better signal propagation to subprocess --- caper/nb_subproc_thread.py | 39 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/caper/nb_subproc_thread.py b/caper/nb_subproc_thread.py index 7d5e93e3..caa81757 100644 --- a/caper/nb_subproc_thread.py +++ b/caper/nb_subproc_thread.py @@ -6,6 +6,24 @@ logger = logging.getLogger(__name__) +interrupted = False +terminated = False + + +def sigterm_handler(signo, frame): + global terminated + logger.info('Received SIGTERM.') + terminated = True + + +def sigint_handler(signo, frame): + global interrupted + logger.info('Received SIGINT.') + interrupted = True + + +signal.signal(signal.SIGTERM, sigterm_handler) +signal.signal(signal.SIGINT, sigint_handler) def is_fileobj_open(fileobj): @@ -81,6 +99,8 @@ def __init__( No logging. subprocess_name: Subprocess name for logging. + signal_handler: + Signal handler for a graceful shutdown. """ super().__init__( target=self._popen, @@ -163,6 +183,9 @@ def _popen( ): """Wrapper for subprocess.Popen(). """ + global terminated + global interrupted + def read_stdout(stdout_bytes): text = stdout_bytes.decode() if text: @@ -213,10 +236,22 @@ def read_from_stderr_obj(stderr): self._returncode = p.poll() break - if self._stop_it and self._stop_signal: - p.send_signal(self._stop_signal) + if terminated or interrupted or self._stop_it and self._stop_signal: + if terminated: + stop_signal = signal.SIGTERM + elif interrupted: + stop_signal = signal.SIGINT + else: + stop_signal = self._stop_signal + + logger.info( + f'Sending signal {stop_signal} to subprocess {p.pid}' + ) + p.send_signal(stop_signal) + self._returncode = p.returncode break + time.sleep(self._poll_interval) except Exception as e: From 2f1fcc0b3f81a56d113301b62a55310260572196 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Thu, 9 Jun 2022 09:55:19 -0700 Subject: [PATCH 14/19] SIGINT is better for graceful shutdown --- caper/hpc.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/caper/hpc.py b/caper/hpc.py index 5c8c6f24..070a9b6e 100644 --- a/caper/hpc.py +++ b/caper/hpc.py @@ -87,7 +87,7 @@ def _list(self): @abstractmethod def _abort(self, job_ids): - """Sends SIGINT to Caper for a graceful shutdown. + """Sends SIGINT (or SIGTERM) to Caper for a graceful shutdown. """ pass @@ -131,9 +131,11 @@ def _list(self): ]) def _abort(self, job_ids): - """Notes: --full is necessary to correctly send SIGTERM to the leader job (Cromwell process). + """Notes: --full is necessary to correctly send SIGINT to the leader job (Cromwell process). + Sending SIGTERM may result in an immediate shutdown of the leaderjob on some clusters. + SIGINT is much better to trigger a graceful shutdown. """ - return self._run_command(['scancel', '--full', '--signal=SIGTERM'] + job_ids) + return self._run_command(['scancel', '--full', '--signal=SIGINT'] + job_ids) class SgeWrapper(HpcWrapper): From b3cd93985823048215d5abbefc6f523bed584f0c Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Thu, 9 Jun 2022 09:55:47 -0700 Subject: [PATCH 15/19] log more info about signaling subprocess --- caper/nb_subproc_thread.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/caper/nb_subproc_thread.py b/caper/nb_subproc_thread.py index caa81757..a56dbafc 100644 --- a/caper/nb_subproc_thread.py +++ b/caper/nb_subproc_thread.py @@ -245,7 +245,8 @@ def read_from_stderr_obj(stderr): stop_signal = self._stop_signal logger.info( - f'Sending signal {stop_signal} to subprocess {p.pid}' + f'Sending signal {stop_signal} to subprocess. ' + f'name: {self._subprocess_name}, pid: {p.pid}' ) p.send_signal(stop_signal) From 238c8f2645b91bc5db1e8cfdcb6150aebc3d759f Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Thu, 9 Jun 2022 14:53:24 -0700 Subject: [PATCH 16/19] update README for hpc --- README.md | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4bd96256..f3d4aa47 100644 --- a/README.md +++ b/README.md @@ -87,19 +87,32 @@ Use `caper hpc list` to list all leader jobs. Use `caper hpc abort JOB_ID` to ab Here are some example command lines to submit Caper as a leader job. Make sure that you correctly configured Caper with `caper init` and filled all parameters in the conf file `~/.caper/default.conf`. -There is an extra set of parameters `--db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING]` to use call-caching (restarting workflows by re-using previous outputs). If you want to restart a failed workflow then use the same metadata DB path then pipeline will start from where it left off. It will actually start over but will reuse (soft-link) previous outputs. +There is an extra set of parameters `--file-db [METADATA_DB_PATH_FOR_CALL_CACHING]` to use call-caching (restarting workflows by re-using previous outputs). If you want to restart a failed workflow then use the same metadata DB path then pipeline will start from where it left off. It will actually start over but will reuse (soft-link) previous outputs. ```bash # make a new output directory for a workflow. $ cd [OUTPUT_DIR] -# Example for SLURM (e.gl Stanford Sherlock and SCG) and SGE -$ caper hpc submit [WDL] -i [INPUT_JSON] --singularity --db file --file-db [METADATA_DB_PATH_FOR_CALL_CACHING] +# Example with Singularity without using call-caching. +$ caper hpc submit [WDL] -i [INPUT_JSON] --singularity --leader-job-name GOOD_NAME1 -# Check status of leader jobs +# Example with Conda and using call-caching (restarting a workflow from where it left off) +# Use the same --file-db PATH for next re-run then Caper will collect and softlink previous outputs. +$ caper hpc submit [WDL] -i [INPUT_JSON] --conda --leader-job-name GOOD_NAME2 --db file --file-db [METADATA_DB_PATH] + +# List all leader jobs. $ caper hpc list +# Check leader job's STDOUT file to monitor workflow's status. +# Example for SLURM +$ tail -f slurm-[JOB_ID].out + +# Cromwell's log will be written to cromwell.out* on the same directory. +# It will be helpful for monitoring your workflow in detail. +$ ls -l cromwell.out* + # Abort a leader job (this will cascade-kill all its child jobs) +# If you directly use job engine's command like scancel or qdel then child jobs will still remain running. $ caper hpc abort [JOB_ID] ``` From d0310aa9bca60015b30907a8781797781100a132 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Thu, 9 Jun 2022 14:53:43 -0700 Subject: [PATCH 17/19] add dev dir to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index ac196a7d..1efe2d6e 100644 --- a/.gitignore +++ b/.gitignore @@ -111,3 +111,4 @@ src/test_caper_uri/ cromwell.out dev/ +tests/hpc/ From 8952ee2da355ca28814b6a2e1ee9a4293d4384f6 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Thu, 9 Jun 2022 16:08:11 -0700 Subject: [PATCH 18/19] make member vars protected --- caper/hpc.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/caper/hpc.py b/caper/hpc.py index 070a9b6e..1c6b0f1e 100644 --- a/caper/hpc.py +++ b/caper/hpc.py @@ -39,7 +39,7 @@ def __init__( ): """Base class for HPC job engine wrapper. """ - self.leader_job_resource_param = leader_job_resource_param + self._leader_job_resource_param = leader_job_resource_param def submit(self, job_name, caper_run_command): """Submits a caper leader job to HPC (e.g. sbatch, qsub). @@ -116,10 +116,10 @@ def __init__( ) slurm_partition_param = ['-p', slurm_partition] if slurm_partition else [] slurm_account_param = ['-A', slurm_account] if slurm_account else [] - self.slurm_extra_param = slurm_partition_param + slurm_account_param + self._slurm_extra_param = slurm_partition_param + slurm_account_param def _submit(self, job_name, shell_script): - command = ['sbatch'] + self.leader_job_resource_param + self.slurm_extra_param + [ + command = ['sbatch'] + self._leader_job_resource_param + self._slurm_extra_param + [ '--export=ALL', '-J', make_caper_leader_job_name(job_name), shell_script, ] @@ -149,10 +149,10 @@ def __init__( super().__init__( leader_job_resource_param=leader_job_resource_param, ) - self.sge_queue_param = ['-q', sge_queue] if sge_queue else [] + self._sge_queue_param = ['-q', sge_queue] if sge_queue else [] def _submit(self, job_name, shell_script): - command = ['qsub'] + self.leader_job_resource_param + self.sge_queue_param + [ + command = ['qsub'] + self._leader_job_resource_param + self._sge_queue_param + [ '-V', '-terse', '-N', make_caper_leader_job_name(job_name), shell_script ] @@ -178,10 +178,10 @@ def __init__( super().__init__( leader_job_resource_param=leader_job_resource_param, ) - self.pbs_queue_param = ['-q', pbs_queue] if pbs_queue else [] + self._pbs_queue_param = ['-q', pbs_queue] if pbs_queue else [] def _submit(self, job_name, shell_script): - command = ['qsub'] + self.leader_job_resource_param + self.pbs_queue_param + [ + command = ['qsub'] + self._leader_job_resource_param + self._pbs_queue_param + [ '-V', '-N', make_caper_leader_job_name(job_name), shell_script ] @@ -193,7 +193,7 @@ def _list(self): ]) def abort(self, job_ids): - return self._run_command(['qdel'] + job_ids) + return self._run_command(['qdel', '-W', '30'] + job_ids) class LsfWrapper(HpcWrapper): @@ -207,10 +207,10 @@ def __init__( super().__init__( leader_job_resource_param=leader_job_resource_param, ) - self.lsf_queue_param = ['-q', lsf_queue] if lsf_queue else [] + self._lsf_queue_param = ['-q', lsf_queue] if lsf_queue else [] def _submit(self, job_name, shell_script): - command = ['bsub'] + self.leader_job_resource_param + self.lsf_queue_param + [ + command = ['bsub'] + self._leader_job_resource_param + self._lsf_queue_param + [ '-env', 'all', '-J', make_caper_leader_job_name(job_name), shell_script ] From 842e84c43bcc6fdf4f4eab4e1ce69583c4c1b262 Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Sun, 12 Jun 2022 21:47:56 -0700 Subject: [PATCH 19/19] fix typo --- caper/hpc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/caper/hpc.py b/caper/hpc.py index 1c6b0f1e..cb86ef32 100644 --- a/caper/hpc.py +++ b/caper/hpc.py @@ -163,7 +163,7 @@ def _list(self): 'qstat', '-u', get_user_from_os_environ() ]) - def abort(self, job_ids): + def _abort(self, job_ids): return self._run_command(['qdel'] + job_ids) @@ -192,7 +192,7 @@ def _list(self): 'qstat', '-u', get_user_from_os_environ() ]) - def abort(self, job_ids): + def _abort(self, job_ids): return self._run_command(['qdel', '-W', '30'] + job_ids) @@ -221,5 +221,5 @@ def _list(self): 'bjobs', '-u', get_user_from_os_environ() ]) - def abort(self, job_ids): + def _abort(self, job_ids): return self._run_command(['bkill'] + job_ids)