From 19829c4d59f00d82842297f1deb8119a1b5dbdcb Mon Sep 17 00:00:00 2001 From: Jin wook Lee Date: Fri, 4 Nov 2022 10:38:23 -0700 Subject: [PATCH] lint with latest black --- .pre-commit-config.yaml | 13 ++-- README.md | 2 +- caper/arg_tool.py | 1 + caper/caper_args.py | 27 +++---- caper/caper_base.py | 3 +- caper/caper_init.py | 12 --- caper/caper_runner.py | 82 ++++++++++----------- caper/caper_wdl_parser.py | 12 +-- caper/cli.py | 11 +-- caper/cli_hpc.py | 15 ++-- caper/cromwell.py | 7 +- caper/cromwell_backend.py | 53 ++++++-------- caper/cromwell_metadata.py | 24 +++--- caper/cromwell_rest_api.py | 3 +- caper/cromwell_workflow_monitor.py | 9 +-- caper/dict_tool.py | 3 +- caper/hocon_string.py | 3 +- caper/hpc.py | 110 ++++++++++++++++------------ caper/nb_subproc_thread.py | 6 +- caper/wdl_parser.py | 3 +- docs/resource_param.md | 8 +- tests/conftest.py | 3 +- tests/test_caper_wdl_parser.py | 4 +- tests/test_caper_workflow_opts.py | 9 +-- tests/test_cli_run.py | 6 +- tests/test_cli_server_client_gcp.py | 3 +- tests/test_cromwell.py | 3 +- tests/test_cromwell_backend.py | 3 +- tests/test_cromwell_rest_api.py | 3 +- tests/test_hocon_string.py | 3 +- tests/test_resource_analysis.py | 3 +- tests/test_wdl_parser.py | 8 +- 32 files changed, 207 insertions(+), 248 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7b838673..d40f6d75 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,7 @@ --- +repos: - repo: https://github.com/psf/black - rev: 19.3b0 + rev: 22.3.0 hooks: - id: black args: [--skip-string-normalization] @@ -33,8 +34,8 @@ - id: debug-statements - id: check-yaml - - repo: https://github.com/jumanjihouse/pre-commit-hook-yamlfmt - rev: 0.0.10 - hooks: - - id: yamlfmt - args: [--mapping, '2', --sequence, '4', --offset, '2'] +# - repo: https://github.com/jumanjihouse/pre-commit-hook-yamlfmt +# rev: 0.0.10 +# hooks: +# - id: yamlfmt +# args: [--mapping, '2', --sequence, '4', --offset, '2'] diff --git a/README.md b/README.md index b312ec04..38f3eec3 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,7 @@ $ caper hpc submit [WDL] -i [INPUT_JSON] --singularity --leader-job-name GOOD_NA # Example with Conda and using call-caching (restarting a workflow from where it left off) # Use the same --file-db PATH for next re-run then Caper will collect and softlink previous outputs. -$ caper hpc submit [WDL] -i [INPUT_JSON] --conda --leader-job-name GOOD_NAME2 --db file --file-db [METADATA_DB_PATH] +$ caper hpc submit [WDL] -i [INPUT_JSON] --conda --leader-job-name GOOD_NAME2 --db file --file-db [METADATA_DB_PATH] # List all leader jobs. $ caper hpc list diff --git a/caper/arg_tool.py b/caper/arg_tool.py index 31801ab5..8774f36b 100644 --- a/caper/arg_tool.py +++ b/caper/arg_tool.py @@ -1,6 +1,7 @@ import os from argparse import ArgumentParser from configparser import ConfigParser, MissingSectionHeaderError + from distutils.util import strtobool diff --git a/caper/caper_args.py b/caper/caper_args.py index 6200a27c..b394f511 100644 --- a/caper/caper_args.py +++ b/caper/caper_args.py @@ -23,14 +23,9 @@ CromwellBackendSlurm, ) from .cromwell_rest_api import CromwellRestAPI +from .hpc import LsfWrapper, PbsWrapper, SgeWrapper, SlurmWrapper from .resource_analysis import ResourceAnalysis from .server_heartbeat import ServerHeartbeat -from .hpc import ( - SlurmWrapper, - SgeWrapper, - PbsWrapper, - LsfWrapper, -) DEFAULT_CAPER_CONF = '~/.caper/default.conf' DEFAULT_LIST_FORMAT = 'id,status,name,str_label,user,parent,submission' @@ -163,7 +158,7 @@ def get_parser_and_defaults(conf_file=None): ) group_db.add_argument( '--db', - default=CromwellBackendDatabase.DEFAULT_DB, + default=CromwellBackendDatabase.DB_FILE, help='Cromwell metadata database type', ) group_db.add_argument( @@ -534,31 +529,31 @@ def get_parser_and_defaults(conf_file=None): '--leader-job-name', help='Leader job name for a submitted workflow.' 'This name will be appended to the prefix "CAPER_LEADER_" and then ' - 'submitted to HPC. Such prefix is used to identify Caper leader jobs.' + 'submitted to HPC. Such prefix is used to identify Caper leader jobs.', ) group_hpc_submit.add_argument( '--slurm-leader-job-resource-param', help='Resource parameters to submit a Caper leader job to SLURM. ' 'Make sure to quote if you use it in the command line arguments.', - default=' '.join(SlurmWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM) + default=' '.join(SlurmWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM), ) group_hpc_submit.add_argument( '--sge-leader-job-resource-param', help='Resource parameters to submit a Caper leader job to SGE' 'Make sure to quote if you use it in the command line arguments.', - default=' '.join(SgeWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM) + default=' '.join(SgeWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM), ) group_hpc_submit.add_argument( '--pbs-leader-job-resource-param', help='Resource parameters to submit a Caper leader job to PBS' 'Make sure to quote if you use it in the command line arguments.', - default=' '.join(PbsWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM) + default=' '.join(PbsWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM), ) group_hpc_submit.add_argument( '--lsf-leader-job-resource-param', help='Resource parameters to submit a Caper leader job to LSF' 'Make sure to quote if you use it in the command line arguments.', - default=' '.join(LsfWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM) + default=' '.join(LsfWrapper.DEFAULT_LEADER_JOB_RESOURCE_PARAM), ) group_slurm = parent_submit.add_argument_group('SLURM arguments') @@ -771,7 +766,7 @@ def get_parser_and_defaults(conf_file=None): parent_hpc_abort.add_argument( 'job_ids', nargs='+', - help='Job ID or list of job IDs to abort matching Caper leader jobs.' + help='Job ID or list of job IDs to abort matching Caper leader jobs.', ) # all subcommands @@ -864,18 +859,18 @@ def get_parser_and_defaults(conf_file=None): parents=[parent_all], ) subparser_hpc = p_hpc.add_subparsers(dest='hpc_action') - p_hpc_submit = subparser_hpc.add_parser( + subparser_hpc.add_parser( 'submit', help='Submit a single workflow to HPC.', parents=[parent_all, parent_submit, parent_run, parent_runner, parent_backend], ) - p_hpc_list = subparser_hpc.add_parser( + subparser_hpc.add_parser( 'list', help='List all workflows submitted to HPC.', parents=[parent_all, parent_backend], ) - p_hpc_abort = subparser_hpc.add_parser( + subparser_hpc.add_parser( 'abort', help='Abort a workflow submitted to HPC.', parents=[parent_all, parent_backend, parent_hpc_abort], diff --git a/caper/caper_base.py b/caper/caper_base.py index 2292c044..a163533c 100644 --- a/caper/caper_base.py +++ b/caper/caper_base.py @@ -185,8 +185,7 @@ def create_timestamped_work_dir(self, prefix=''): return work_dir def get_loc_dir(self, backend): - """Get localization directory for a backend. - """ + """Get localization directory for a backend.""" if backend == BACKEND_GCP: return self._gcp_loc_dir elif backend == BACKEND_AWS: diff --git a/caper/caper_init.py b/caper/caper_init.py index 656f1b36..89fc6fb8 100644 --- a/caper/caper_init.py +++ b/caper/caper_init.py @@ -10,20 +10,8 @@ BACKEND_PBS, BACKEND_SGE, BACKEND_SLURM, - CromwellBackendLsf, - CromwellBackendPbs, - CromwellBackendSge, - CromwellBackendSlurm, ) -from .hpc import ( - SlurmWrapper, - SgeWrapper, - PbsWrapper, - LsfWrapper, -) - - CONF_CONTENTS_TMP_DIR = """ # Local directory for localized files and Cromwell's intermediate files. # If not defined then Caper will make .caper_tmp/ on CWD or `local-out-dir`. diff --git a/caper/caper_runner.py b/caper/caper_runner.py index 0d3ba865..1deb684c 100644 --- a/caper/caper_runner.py +++ b/caper/caper_runner.py @@ -495,47 +495,47 @@ def server( dry_run=False, ): """Run a Cromwell server. - default_backend: - Default backend. If backend is not specified for a submitted workflow - then default backend will be used. - Choose among Caper's built-in backends. - (aws, gcp, Local, slurm, sge, pbs, lsf). - Or use a backend defined in your custom backend config file - (above "backend_conf" file). - server_heartbeat: - Server heartbeat to write hostname/port of a server. - server_port: - Server port to run Cromwell server. - Make sure to use different port for multiple Cromwell servers on the same - machine. - server_hostname: - Server hostname. If not defined then socket.gethostname() will be used. - If server_heartbeat is given, then this hostname will be written to - the server heartbeat file defined in server_heartbeat. - custom_backend_conf: - Backend config file (HOCON) to override Caper's auto-generated backend config. - fileobj_stdout: - File-like object to write Cromwell's STDOUT. - embed_subworkflow: - Caper stores/updates metadata.JSON file on - each workflow's root directory whenever there is status change - of workflow (or its tasks). - This flag ensures that any subworkflow's metadata JSON will be - embedded in main (this) workflow's metadata JSON. - This is to mimic behavior of Cromwell run mode's -m parameter. - java_heap_server: - Java heap (java -Xmx) for Cromwell server mode. - auto_write_metadata: - Automatic retrieval/writing of metadata.json upon workflow/task's status change. - work_dir: - Local temporary directory to store all temporary files. - Temporary files mean intermediate files used for running Cromwell. - For example, auto-generated backend config file and workflow options file. - If this is not defined, then cache directory self._local_loc_dir with a timestamp - will be used. - However, Cromwell Java process itself will run on CWD instead of this directory. - dry_run: - Stop before running Java command line for Cromwell. + default_backend: + Default backend. If backend is not specified for a submitted workflow + then default backend will be used. + Choose among Caper's built-in backends. + (aws, gcp, Local, slurm, sge, pbs, lsf). + Or use a backend defined in your custom backend config file + (above "backend_conf" file). + server_heartbeat: + Server heartbeat to write hostname/port of a server. + server_port: + Server port to run Cromwell server. + Make sure to use different port for multiple Cromwell servers on the same + machine. + server_hostname: + Server hostname. If not defined then socket.gethostname() will be used. + If server_heartbeat is given, then this hostname will be written to + the server heartbeat file defined in server_heartbeat. + custom_backend_conf: + Backend config file (HOCON) to override Caper's auto-generated backend config. + fileobj_stdout: + File-like object to write Cromwell's STDOUT. + embed_subworkflow: + Caper stores/updates metadata.JSON file on + each workflow's root directory whenever there is status change + of workflow (or its tasks). + This flag ensures that any subworkflow's metadata JSON will be + embedded in main (this) workflow's metadata JSON. + This is to mimic behavior of Cromwell run mode's -m parameter. + java_heap_server: + Java heap (java -Xmx) for Cromwell server mode. + auto_write_metadata: + Automatic retrieval/writing of metadata.json upon workflow/task's status change. + work_dir: + Local temporary directory to store all temporary files. + Temporary files mean intermediate files used for running Cromwell. + For example, auto-generated backend config file and workflow options file. + If this is not defined, then cache directory self._local_loc_dir with a timestamp + will be used. + However, Cromwell Java process itself will run on CWD instead of this directory. + dry_run: + Stop before running Java command line for Cromwell. """ if work_dir is None: work_dir = self.create_timestamped_work_dir( diff --git a/caper/caper_wdl_parser.py b/caper/caper_wdl_parser.py index 1b573bbc..2df298d5 100644 --- a/caper/caper_wdl_parser.py +++ b/caper/caper_wdl_parser.py @@ -6,8 +6,7 @@ class CaperWDLParser(WDLParser): - """WDL parser for Caper. - """ + """WDL parser for Caper.""" RE_WDL_COMMENT_DOCKER = r'^\s*\#\s*CAPER\s+docker\s(.+)' RE_WDL_COMMENT_SINGULARITY = r'^\s*\#\s*CAPER\s+singularity\s(.+)' @@ -25,8 +24,7 @@ def __init__(self, wdl): @property def caper_docker(self): - """Backward compatibility for property name. See property default_docker. - """ + """Backward compatibility for property name. See property default_docker.""" return self.default_docker @property @@ -48,8 +46,7 @@ def default_docker(self): @property def caper_singularity(self): - """Backward compatibility for property name. See property default_singularity. - """ + """Backward compatibility for property name. See property default_singularity.""" return self.default_singularity @property @@ -71,8 +68,7 @@ def default_singularity(self): @property def default_conda(self): - """Find a default Conda environment name in WDL for Caper. - """ + """Find a default Conda environment name in WDL for Caper.""" if self.workflow_meta: for conda_key in CaperWDLParser.WDL_WORKFLOW_META_CONDA_KEYS: if conda_key in self.workflow_meta: diff --git a/caper/cli.py b/caper/cli.py index 28d2d644..a69f5768 100644 --- a/caper/cli.py +++ b/caper/cli.py @@ -5,7 +5,6 @@ import logging import os import re -import subprocess import sys from autouri import GCSURI, AutoURI @@ -16,6 +15,7 @@ from .caper_init import init_caper_conf from .caper_labels import CaperLabels from .caper_runner import CaperRunner +from .cli_hpc import subcmd_hpc from .cromwell_backend import ( BACKEND_ALIAS_LOCAL, BACKEND_LOCAL, @@ -25,8 +25,6 @@ from .dict_tool import flatten_dict from .resource_analysis import LinearResourceAnalysis from .server_heartbeat import ServerHeartbeat -from .cli_hpc import subcmd_hpc - logger = logging.getLogger(__name__) @@ -326,7 +324,6 @@ def client(args): raise ValueError('Unsupported client action {act}'.format(act=args.action)) - def subcmd_server(caper_runner, args, nonblocking=False): """ Args: @@ -541,8 +538,7 @@ def get_single_cromwell_metadata_obj(caper_client, args, subcmd): def split_list_into_file_and_non_file(lst): - """Returns tuple of (list of existing files, list of non-file strings) - """ + """Returns tuple of (list of existing files, list of non-file strings)""" files = [] non_files = [] @@ -666,8 +662,7 @@ def subcmd_gcp_res_analysis(caper_client, args): def subcmd_cleanup(caper_client, args): - """Cleanup outputs of a workflow. - """ + """Cleanup outputs of a workflow.""" cm = get_single_cromwell_metadata_obj(caper_client, args, 'cleanup') cm.cleanup(dry_run=not args.delete, num_threads=args.num_threads, no_lock=True) if not args.delete: diff --git a/caper/cli_hpc.py b/caper/cli_hpc.py index 5f67c9ad..b4bb4578 100644 --- a/caper/cli_hpc.py +++ b/caper/cli_hpc.py @@ -1,7 +1,7 @@ import logging import sys -from .hpc import (SlurmWrapper, SgeWrapper, PbsWrapper, LsfWrapper) +from .hpc import LsfWrapper, PbsWrapper, SgeWrapper, SlurmWrapper logger = logging.getLogger(__name__) @@ -34,25 +34,22 @@ def subcmd_hpc(args): stdout = SlurmWrapper( args.slurm_leader_job_resource_param.split(), args.slurm_partition, - args.slurm_account + args.slurm_account, ).submit(args.leader_job_name, caper_run_command) elif args.backend == 'sge': stdout = SgeWrapper( - args.sge_leader_job_resource_param.split(), - args.sge_queue + args.sge_leader_job_resource_param.split(), args.sge_queue ).submit(args.leader_job_name, caper_run_command) - + elif args.backend == 'pbs': stdout = PbsWrapper( - args.pbs_leader_job_resource_param.split(), - args.pbs_queue + args.pbs_leader_job_resource_param.split(), args.pbs_queue ).submit(args.leader_job_name, caper_run_command) elif args.backend == 'lsf': stdout = LsfWrapper( - args.lsf_leader_job_resource_param.split(), - args.lsf_queue + args.lsf_leader_job_resource_param.split(), args.lsf_queue ).submit(args.leader_job_name, caper_run_command) else: diff --git a/caper/cromwell.py b/caper/cromwell.py index 2f3a0fa9..02a04480 100644 --- a/caper/cromwell.py +++ b/caper/cromwell.py @@ -34,8 +34,7 @@ def install_file(f, install_dir, label): class Cromwell: - """Wraps Cromwell/Womtool. - """ + """Wraps Cromwell/Womtool.""" DEFAULT_CROMWELL = 'https://github.com/broadinstitute/cromwell/releases/download/82/cromwell-82.jar' DEFAULT_WOMTOOL = ( @@ -158,7 +157,9 @@ def on_stderr(s): ) else: raise WomtoolValidationFailed( - 'RC={rc}\nSTDERR={stderr}'.format(rc=th.returncode, stderr=stderr) + 'RC={rc}\nSTDERR={stderr}'.format( + rc=th.returncode, stderr=stderr + ) ) logger.info('Passed Womtool validation.') diff --git a/caper/cromwell_backend.py b/caper/cromwell_backend.py index eccf86ea..43f8ed5f 100644 --- a/caper/cromwell_backend.py +++ b/caper/cromwell_backend.py @@ -47,8 +47,7 @@ def get_s3_bucket_name(s3_uri): class CromwellBackendCommon(UserDict): - """Basic stanzas for Cromwell backend conf. - """ + """Basic stanzas for Cromwell backend conf.""" TEMPLATE = { 'backend': {}, @@ -105,8 +104,7 @@ def __init__( class CromwellBackendServer(UserDict): - """Stanzas for Cromwell server. - """ + """Stanzas for Cromwell server.""" TEMPLATE = {'webservice': {}} @@ -119,8 +117,7 @@ def __init__(self, server_port=DEFAULT_SERVER_PORT): class CromwellBackendDatabase(UserDict): - """Common stanzas for Cromwell's metadata database. - """ + """Common stanzas for Cromwell's metadata database.""" TEMPLATE = {'database': {'db': {'connectionTimeout': 5000, 'numThreads': 1}}} @@ -219,8 +216,7 @@ def __init__( class CromwellBackendBase(UserDict): - """Base skeleton backend for all backends - """ + """Base skeleton backend for all backends""" TEMPLATE = {'backend': {'providers': {}}} TEMPLATE_BACKEND = {'config': {'default-runtime-attributes': {}, 'filesystems': {}}} @@ -286,16 +282,13 @@ def backend_config(self): @property def default_runtime_attributes(self): - """Backend's default runtime attributes in self.backend_config. - """ + """Backend's default runtime attributes in self.backend_config.""" return self.backend_config['default-runtime-attributes'] class CromwellBackendGcp(CromwellBackendBase): TEMPLATE = { - 'google': { - 'application-name': 'cromwell' - }, + 'google': {'application-name': 'cromwell'}, 'engine': {'filesystems': {FILESYSTEM_GCS: {'auth': 'default'}}}, } TEMPLATE_BACKEND = { @@ -390,7 +383,9 @@ def __init__( self['google']['auths'] = [ {'name': 'application-default', 'scheme': 'application_default'} ] - self['engine']['filesystems'][FILESYSTEM_GCS]['auth'] = 'application-default' + self['engine']['filesystems'][FILESYSTEM_GCS][ + 'auth' + ] = 'application-default' if use_google_cloud_life_sciences: self.backend['actor-factory'] = CromwellBackendGcp.ACTOR_FACTORY_V2BETA @@ -490,26 +485,26 @@ def __init__( class CromwellBackendLocal(CromwellBackendBase): """Class constants: - SUBMIT_DOCKER: - Cromwell falls back to 'submit_docker' instead of 'submit' if WDL task has - 'docker' in runtime and runtime-attributes are declared in backend's config. + SUBMIT_DOCKER: + Cromwell falls back to 'submit_docker' instead of 'submit' if WDL task has + 'docker' in runtime and runtime-attributes are declared in backend's config. - Docker and Singularity can map paths between inside and outside of the container. - So this is not an issue for those container environments. + Docker and Singularity can map paths between inside and outside of the container. + So this is not an issue for those container environments. - For Conda, any container paths (docker_cwd, e.g. /cromwell-executions/**) - in the script is simply replaced with CWD. + For Conda, any container paths (docker_cwd, e.g. /cromwell-executions/**) + in the script is simply replaced with CWD. - This also replaces filenames written in write_*.tsv files (globbed by WDL functions). - e.g. write_lines(), write_tsv(), ... + This also replaces filenames written in write_*.tsv files (globbed by WDL functions). + e.g. write_lines(), write_tsv(), ... - See the following document for such WDL functions: - https://github.com/openwdl/wdl/blob/main/versions/development/SPEC.md#file-write_linesarraystring + See the following document for such WDL functions: + https://github.com/openwdl/wdl/blob/main/versions/development/SPEC.md#file-write_linesarraystring - Possible issue: - - 'sed' is used here with a delimiter as hash mark (#) - so hash marks in output path can result in error. - - Files globbed by WDL functions other than write_*() will still have paths inside a container. + Possible issue: + - 'sed' is used here with a delimiter as hash mark (#) + so hash marks in output path can result in error. + - Files globbed by WDL functions other than write_*() will still have paths inside a container. """ RUNTIME_ATTRIBUTES = dedent( diff --git a/caper/cromwell_metadata.py b/caper/cromwell_metadata.py index dbbf2fcf..fd41fc59 100644 --- a/caper/cromwell_metadata.py +++ b/caper/cromwell_metadata.py @@ -27,8 +27,7 @@ def get_workflow_id_from_workflow_root(workflow_root): def parse_cromwell_disks(s): - """Parses Cromwell's disks in runtime attribute. - """ + """Parses Cromwell's disks in runtime attribute.""" if s: m = re.findall(r'(\d+)', s) if m: @@ -36,15 +35,13 @@ def parse_cromwell_disks(s): def parse_cromwell_memory(s): - """Parses Cromwell's memory runtime attribute. - """ + """Parses Cromwell's memory runtime attribute.""" if s: return humanfriendly.parse_size(s) def convert_type_np_to_py(o): - """Convert numpy type to Python type. - """ + """Convert numpy type to Python type.""" if isinstance(o, np.generic): return o.item() raise TypeError @@ -55,8 +52,7 @@ class CromwellMetadata: DEFAULT_GCP_MONITOR_STAT_METHODS = ('mean', 'std', 'max', 'min', 'last') def __init__(self, metadata): - """Parses metadata JSON (dict) object or file. - """ + """Parses metadata JSON (dict) object or file.""" if isinstance(metadata, dict): self._metadata = metadata elif isinstance(metadata, CromwellMetadata): @@ -159,8 +155,7 @@ def recurse_calls(self, fn_call, parent_call_names=tuple()): yield fn_call(call_name, call, parent_call_names) def write_on_workflow_root(self, basename=DEFAULT_METADATA_BASENAME): - """Update metadata JSON file on metadata's output root directory. - """ + """Update metadata JSON file on metadata's output root directory.""" root = self.workflow_root if root: @@ -184,8 +179,10 @@ def troubleshoot(self, show_completed_task=False, show_stdout=False): result: Troubleshooting report as a plain string. """ - result = '* Started troubleshooting workflow: id={id}, status={status}\n'.format( - id=self.workflow_id, status=self.workflow_status + result = ( + '* Started troubleshooting workflow: id={id}, status={status}\n'.format( + id=self.workflow_id, status=self.workflow_status + ) ) if self.workflow_status == 'Succeeded': @@ -198,8 +195,7 @@ def troubleshoot(self, show_completed_task=False, show_stdout=False): ) def troubleshoot_call(call_name, call, parent_call_names): - """Returns troubleshooting help message. - """ + """Returns troubleshooting help message.""" nonlocal show_completed_task nonlocal show_stdout status = call.get('executionStatus') diff --git a/caper/cromwell_rest_api.py b/caper/cromwell_rest_api.py index 7900cb85..deb3ba24 100644 --- a/caper/cromwell_rest_api.py +++ b/caper/cromwell_rest_api.py @@ -493,8 +493,7 @@ def find(self, workflow_ids=None, labels=None, exclude_subworkflow=True): return result def __init_auth(self): - """Init auth object - """ + """Init auth object""" if self._user is not None and self._password is not None: self._auth = (self._user, self._password) else: diff --git a/caper/cromwell_workflow_monitor.py b/caper/cromwell_workflow_monitor.py index 467ea903..a4031595 100644 --- a/caper/cromwell_workflow_monitor.py +++ b/caper/cromwell_workflow_monitor.py @@ -224,8 +224,7 @@ def _update_server_start(self, stderr): break def _update_workflows(self, stderr): - """Updates workflow status by parsing Cromwell's stderr lines. - """ + """Updates workflow status by parsing Cromwell's stderr lines.""" updated_workflows = set() workflows_to_write_metadata = set() for line in stderr.split('\n'): @@ -251,8 +250,7 @@ def _update_subworkflows(self, stderr): self._subworkflows.add(subworkflow_id) def _update_tasks(self, stderr): - """Check if workflow's task status changed by parsing Cromwell's stderr lines. - """ + """Check if workflow's task status changed by parsing Cromwell's stderr lines.""" for line in stderr.split('\n'): r_common = None r_start = re.findall(CromwellWorkflowMonitor.RE_TASK_START, line) @@ -303,8 +301,7 @@ def _find_workflow_id_from_short_id(self, short_id): return w def _write_metadata(self, workflow_id): - """Update metadata on Cromwell'e exec root. - """ + """Update metadata on Cromwell'e exec root.""" if not self._is_server or not self._auto_write_metadata: return if workflow_id in self._subworkflows and self._embed_subworkflow: diff --git a/caper/dict_tool.py b/caper/dict_tool.py index f5cf0109..d43c309a 100644 --- a/caper/dict_tool.py +++ b/caper/dict_tool.py @@ -72,8 +72,7 @@ def recurse_dict_value(d, fnc): def unflatten_dict(d_flat): - """Unflattens single-level-tuple-keyed dict into dict - """ + """Unflattens single-level-tuple-keyed dict into dict""" result = type(d_flat)() for k_tuple, v in d_flat.items(): d_curr = result diff --git a/caper/hocon_string.py b/caper/hocon_string.py index 09112d92..bcac2218 100644 --- a/caper/hocon_string.py +++ b/caper/hocon_string.py @@ -39,8 +39,7 @@ def is_valid_include(include): def get_include_key(include_str): - """Use md5sum hash of the whole include statement string for a key. - """ + """Use md5sum hash of the whole include statement string for a key.""" return hashlib.md5(include_str.encode()).hexdigest() diff --git a/caper/hpc.py b/caper/hpc.py index cb86ef32..81c6dfba 100644 --- a/caper/hpc.py +++ b/caper/hpc.py @@ -5,7 +5,6 @@ import os import subprocess from abc import ABC, abstractmethod -from collections import namedtuple from pathlib import Path from tempfile import NamedTemporaryFile @@ -18,17 +17,22 @@ def get_user_from_os_environ(): return os.environ['USER'] + def make_bash_script_contents(contents): return f'#!/bin/bash\n{contents}\n' + def make_caper_leader_job_name(job_name): """Check if job name contains Comma, TAB or whitespace. They are not allowed since they can be used as separators. """ - if any(illegal_char in job_name for illegal_char in ILLEGAL_CHARS_IN_JOB_NAME): - raise ValueError('Illegal character {chr} in job name {job}'.format( - chr=illegal_chr, job=job_name - )) + for illegal_char in ILLEGAL_CHARS_IN_JOB_NAME: + if illegal_char in job_name: + raise ValueError( + 'Illegal character {chr} in job name {job}'.format( + chr=illegal_char, job=job_name + ) + ) return CAPER_LEADER_JOB_NAME_PREFIX + job_name @@ -37,8 +41,7 @@ def __init__( self, leader_job_resource_param=[], ): - """Base class for HPC job engine wrapper. - """ + """Base class for HPC job engine wrapper.""" self._leader_job_resource_param = leader_job_resource_param def submit(self, job_name, caper_run_command): @@ -74,12 +77,11 @@ def list(self): return '\n'.join(result) def abort(self, job_ids): - """Returns output STDOUT from job engine's abort command (e.g. scancel, qdel). - """ + """Returns output STDOUT from job engine's abort command (e.g. scancel, qdel).""" return self._abort(job_ids) @abstractmethod - def _submit(self, job_name, shell_script): + def _submit(self, job_name, shell_script): pass def _list(self): @@ -87,19 +89,21 @@ def _list(self): @abstractmethod def _abort(self, job_ids): - """Sends SIGINT (or SIGTERM) to Caper for a graceful shutdown. - """ + """Sends SIGINT (or SIGTERM) to Caper for a graceful shutdown.""" pass def _run_command(self, command): - """Runs a shell command line and returns STDOUT. - """ + """Runs a shell command line and returns STDOUT.""" logger.info(f'Running shell command: {" ".join(command)}') - return subprocess.run( - command, - stdout=subprocess.PIPE, - env=os.environ, - ).stdout.decode().strip() + return ( + subprocess.run( + command, + stdout=subprocess.PIPE, + env=os.environ, + ) + .stdout.decode() + .strip() + ) class SlurmWrapper(HpcWrapper): @@ -119,16 +123,28 @@ def __init__( self._slurm_extra_param = slurm_partition_param + slurm_account_param def _submit(self, job_name, shell_script): - command = ['sbatch'] + self._leader_job_resource_param + self._slurm_extra_param + [ - '--export=ALL', '-J', make_caper_leader_job_name(job_name), - shell_script, - ] + command = ( + ['sbatch'] + + self._leader_job_resource_param + + self._slurm_extra_param + + [ + '--export=ALL', + '-J', + make_caper_leader_job_name(job_name), + shell_script, + ] + ) return self._run_command(command) def _list(self): - return self._run_command([ - 'squeue', '-u', get_user_from_os_environ(), '--Format=JobID,Name,State,SubmitTime' - ]) + return self._run_command( + [ + 'squeue', + '-u', + get_user_from_os_environ(), + '--Format=JobID,Name,State,SubmitTime', + ] + ) def _abort(self, job_ids): """Notes: --full is necessary to correctly send SIGINT to the leader job (Cromwell process). @@ -152,16 +168,16 @@ def __init__( self._sge_queue_param = ['-q', sge_queue] if sge_queue else [] def _submit(self, job_name, shell_script): - command = ['qsub'] + self._leader_job_resource_param + self._sge_queue_param + [ - '-V', '-terse', '-N', make_caper_leader_job_name(job_name), - shell_script - ] + command = ( + ['qsub'] + + self._leader_job_resource_param + + self._sge_queue_param + + ['-V', '-terse', '-N', make_caper_leader_job_name(job_name), shell_script] + ) return self._run_command(command) def _list(self): - return self._run_command([ - 'qstat', '-u', get_user_from_os_environ() - ]) + return self._run_command(['qstat', '-u', get_user_from_os_environ()]) def _abort(self, job_ids): return self._run_command(['qdel'] + job_ids) @@ -181,16 +197,16 @@ def __init__( self._pbs_queue_param = ['-q', pbs_queue] if pbs_queue else [] def _submit(self, job_name, shell_script): - command = ['qsub'] + self._leader_job_resource_param + self._pbs_queue_param + [ - '-V', '-N', make_caper_leader_job_name(job_name), - shell_script - ] + command = ( + ['qsub'] + + self._leader_job_resource_param + + self._pbs_queue_param + + ['-V', '-N', make_caper_leader_job_name(job_name), shell_script] + ) return self._run_command(command) def _list(self): - return self._run_command([ - 'qstat', '-u', get_user_from_os_environ() - ]) + return self._run_command(['qstat', '-u', get_user_from_os_environ()]) def _abort(self, job_ids): return self._run_command(['qdel', '-W', '30'] + job_ids) @@ -210,16 +226,16 @@ def __init__( self._lsf_queue_param = ['-q', lsf_queue] if lsf_queue else [] def _submit(self, job_name, shell_script): - command = ['bsub'] + self._leader_job_resource_param + self._lsf_queue_param + [ - '-env', 'all', '-J', make_caper_leader_job_name(job_name), - shell_script - ] + command = ( + ['bsub'] + + self._leader_job_resource_param + + self._lsf_queue_param + + ['-env', 'all', '-J', make_caper_leader_job_name(job_name), shell_script] + ) return self._run_command(command) def _list(self): - return self._run_command([ - 'bjobs', '-u', get_user_from_os_environ() - ]) + return self._run_command(['bjobs', '-u', get_user_from_os_environ()]) def _abort(self, job_ids): return self._run_command(['bkill'] + job_ids) diff --git a/caper/nb_subproc_thread.py b/caper/nb_subproc_thread.py index a56dbafc..bd73ab7d 100644 --- a/caper/nb_subproc_thread.py +++ b/caper/nb_subproc_thread.py @@ -1,10 +1,9 @@ import logging -import time import signal +import time from subprocess import PIPE, Popen from threading import Thread - logger = logging.getLogger(__name__) interrupted = False terminated = False @@ -181,8 +180,7 @@ def _popen( on_stderr=None, on_finish=None, ): - """Wrapper for subprocess.Popen(). - """ + """Wrapper for subprocess.Popen().""" global terminated global interrupted diff --git a/caper/wdl_parser.py b/caper/wdl_parser.py index ce732672..04bae0d9 100644 --- a/caper/wdl_parser.py +++ b/caper/wdl_parser.py @@ -16,8 +16,7 @@ class WDLParser: BASENAME_IMPORTS = 'imports.zip' def __init__(self, wdl): - """Wraps miniwdl's parse_document(). - """ + """Wraps miniwdl's parse_document().""" u = AutoURI(wdl) if not u.exists: raise FileNotFoundError('WDL does not exist: wdl={wdl}'.format(wdl=wdl)) diff --git a/docs/resource_param.md b/docs/resource_param.md index be45a41f..7cd08df7 100644 --- a/docs/resource_param.md +++ b/docs/resource_param.md @@ -25,7 +25,7 @@ slurm-leader-job-resource-param=-t 48:00:00 --mem 4G # It is for HPC backends only (slurm, sge, pbs and lsf). # It is not recommended to change it unless your cluster has custom resource settings. # See https://github.com/ENCODE-DCC/caper/blob/master/docs/resource_param.md for details. -slurm-resource-param=-n 1 --ntasks-per-node=1 --cpus-per-task=${cpu} ${if defined(memory_mb) then "--mem=" else ""}${memory_mb}${if defined(memory_mb) then "M" else ""} ${if defined(time) then "--time=" else ""}${time*60} ${if defined(gpu) then "--gres=gpu:" else ""}${gpu} +slurm-resource-param=-n 1 --ntasks-per-node=1 --cpus-per-task=${cpu} ${if defined(memory_mb) then "--mem=" else ""}${memory_mb}${if defined(memory_mb) then "M" else ""} ${if defined(time) then "--time=" else ""}${time*60} ${if defined(gpu) then "--gres=gpu:" else ""}${gpu} ``` @@ -44,7 +44,7 @@ sge-pe= # It is for HPC backends only (slurm, sge, pbs and lsf). # It is not recommended to change it unless your cluster has custom resource settings. # See https://github.com/ENCODE-DCC/caper/blob/master/docs/resource_param.md for details. -sge-resource-param=${if cpu > 1 then "-pe " + sge_pe + " " else ""} ${if cpu > 1 then cpu else ""} ${true="-l h_vmem=$(expr " false="" defined(memory_mb)}${memory_mb}${true=" / " false="" defined(memory_mb)}${if defined(memory_mb) then cpu else ""}${true=")m" false="" defined(memory_mb)} ${true="-l s_vmem=$(expr " false="" defined(memory_mb)}${memory_mb}${true=" / " false="" defined(memory_mb)}${if defined(memory_mb) then cpu else ""}${true=")m" false="" defined(memory_mb)} ${"-l h_rt=" + time + ":00:00"} ${"-l s_rt=" + time + ":00:00"} ${"-l gpu=" + gpu} +sge-resource-param=${if cpu > 1 then "-pe " + sge_pe + " " else ""} ${if cpu > 1 then cpu else ""} ${true="-l h_vmem=$(expr " false="" defined(memory_mb)}${memory_mb}${true=" / " false="" defined(memory_mb)}${if defined(memory_mb) then cpu else ""}${true=")m" false="" defined(memory_mb)} ${true="-l s_vmem=$(expr " false="" defined(memory_mb)}${memory_mb}${true=" / " false="" defined(memory_mb)}${if defined(memory_mb) then cpu else ""}${true=")m" false="" defined(memory_mb)} ${"-l h_rt=" + time + ":00:00"} ${"-l s_rt=" + time + ":00:00"} ${"-l gpu=" + gpu} ``` ## PBS @@ -57,7 +57,7 @@ pbs-leader-job-resource-param=-l walltime=48:00:00,mem=4gb # It is for HPC backends only (slurm, sge, pbs and lsf). # It is not recommended to change it unless your cluster has custom resource settings. # See https://github.com/ENCODE-DCC/caper/blob/master/docs/resource_param.md for details. -pbs-resource-param=${"-lnodes=1:ppn=" + cpu}${if defined(gpu) then ":gpus=" + gpu else ""} ${if defined(memory_mb) then "-l mem=" else ""}${memory_mb}${if defined(memory_mb) then "mb" else ""} ${"-lwalltime=" + time + ":0:0"} +pbs-resource-param=${"-lnodes=1:ppn=" + cpu}${if defined(gpu) then ":gpus=" + gpu else ""} ${if defined(memory_mb) then "-l mem=" else ""}${memory_mb}${if defined(memory_mb) then "mb" else ""} ${"-lwalltime=" + time + ":0:0"} ``` ## LSF @@ -70,5 +70,5 @@ lsf-leader-job-resource-param=-W 2880 -M 4g # It is for HPC backends only (slurm, sge, pbs and lsf). # It is not recommended to change it unless your cluster has custom resource settings. # See https://github.com/ENCODE-DCC/caper/blob/master/docs/resource_param.md for details. -lsf-resource-param=${"-n " + cpu} ${if defined(gpu) then "-gpu " + gpu else ""} ${if defined(memory_mb) then "-M " else ""}${memory_mb}${if defined(memory_mb) then "m" else ""} ${"-W " + 60*time} +lsf-resource-param=${"-n " + cpu} ${if defined(gpu) then "-gpu " + gpu else ""} ${if defined(memory_mb) then "-M " else ""}${memory_mb}${if defined(memory_mb) then "m" else ""} ${"-W " + 60*time} ``` diff --git a/tests/conftest.py b/tests/conftest.py index 23f3310f..c849a60a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,8 +45,7 @@ def ci_prefix(request): @pytest.fixture(scope='session') def gcs_root(request): - """GCS root to generate test GCS URIs on. - """ + """GCS root to generate test GCS URIs on.""" return request.config.getoption('--gcs-root').rstrip('/') diff --git a/tests/test_caper_wdl_parser.py b/tests/test_caper_wdl_parser.py index 48e2d106..e5f54742 100644 --- a/tests/test_caper_wdl_parser.py +++ b/tests/test_caper_wdl_parser.py @@ -29,8 +29,8 @@ def test_properties(tmp_path): """Test the following properties. - - caper_docker - - caper_singularity + - caper_docker + - caper_singularity """ main_wdl = tmp_path / 'main.wdl' main_wdl.write_text(WDL_CONTENTS) diff --git a/tests/test_caper_workflow_opts.py b/tests/test_caper_workflow_opts.py index fe8ead35..10a29a8a 100644 --- a/tests/test_caper_workflow_opts.py +++ b/tests/test_caper_workflow_opts.py @@ -9,8 +9,7 @@ def test_create_file(tmp_path): - """Test without docker/singularity. - """ + """Test without docker/singularity.""" use_google_cloud_life_sciences = False gcp_zones = ['us-west-1', 'us-west-2'] slurm_partition = 'my_partition' @@ -135,8 +134,7 @@ def test_create_file_with_google_cloud_life_sciences(tmp_path): def test_create_file_docker(tmp_path): - """Test with docker and docker defined in WDL. - """ + """Test with docker and docker defined in WDL.""" wdl_contents = dedent( """\ version 1.0 @@ -205,8 +203,7 @@ def test_create_file_docker(tmp_path): def test_create_file_singularity(tmp_path): - """Test with singularity and singularity defined in WDL. - """ + """Test with singularity and singularity defined in WDL.""" wdl_contents = dedent( """\ version 1.0 diff --git a/tests/test_cli_run.py b/tests/test_cli_run.py index 534ad63e..4634f1a1 100644 --- a/tests/test_cli_run.py +++ b/tests/test_cli_run.py @@ -48,8 +48,7 @@ def test_mutually_exclusive_params(tmp_path, cmd): @pytest.mark.integration def test_run(tmp_path, cromwell, womtool, debug_caper): - """Will test most local parameters (run only) here. - """ + """Will test most local parameters (run only) here.""" make_directory_with_wdls(str(tmp_path)) wdl = tmp_path / 'main.wdl' inputs = tmp_path / 'inputs.json' @@ -116,8 +115,7 @@ def test_run_gcp_with_life_sciences_api( gcp_service_account_key_json, debug_caper, ): - """Test run with Google Cloud Life Sciences API - """ + """Test run with Google Cloud Life Sciences API""" out_gcs_bucket = os.path.join(gcs_root, 'caper_out', ci_prefix) tmp_gcs_bucket = os.path.join(gcs_root, 'caper_tmp') diff --git a/tests/test_cli_server_client_gcp.py b/tests/test_cli_server_client_gcp.py index a0c6bb16..c2b42e54 100644 --- a/tests/test_cli_server_client_gcp.py +++ b/tests/test_cli_server_client_gcp.py @@ -29,8 +29,7 @@ def test_server_client( gcp_service_account_key_json, debug_caper, ): - """Test server, client stuffs - """ + """Test server, client stuffs""" # server command line server_port = 8015 diff --git a/tests/test_cromwell.py b/tests/test_cromwell.py index 88b548a1..bd2276e7 100644 --- a/tests/test_cromwell.py +++ b/tests/test_cromwell.py @@ -115,8 +115,7 @@ def test_run(tmp_path, cromwell, womtool): def test_server(tmp_path, cromwell, womtool): - """Test Cromwell.server() method, which returns a Thread object. - """ + """Test Cromwell.server() method, which returns a Thread object.""" server_port = 8005 fileobj_stdout = sys.stdout diff --git a/tests/test_cromwell_backend.py b/tests/test_cromwell_backend.py index e77224c5..8e776523 100644 --- a/tests/test_cromwell_backend.py +++ b/tests/test_cromwell_backend.py @@ -8,8 +8,7 @@ def test_cromwell_backend_base_backend(): - """Test a property backend's getter, setter - """ + """Test a property backend's getter, setter""" bb1 = CromwellBackendBase('test1') backend_dict = {'a': 1, 'b': '2'} diff --git a/tests/test_cromwell_rest_api.py b/tests/test_cromwell_rest_api.py index 0baa505e..07f69668 100644 --- a/tests/test_cromwell_rest_api.py +++ b/tests/test_cromwell_rest_api.py @@ -49,8 +49,7 @@ def test_has_wildcard(test_input, expected): def test_all(tmp_path, cromwell, womtool): - """Test Cromwell.server() method, which returns a Thread object. - """ + """Test Cromwell.server() method, which returns a Thread object.""" server_port = 8010 fileobj_stdout = sys.stdout test_label = 'test_label' diff --git a/tests/test_hocon_string.py b/tests/test_hocon_string.py index 1b67148c..bb2d6268 100644 --- a/tests/test_hocon_string.py +++ b/tests/test_hocon_string.py @@ -93,8 +93,7 @@ def get_test_dict(with_include=False): def get_test_dict2(): - """Without "include" lines. - """ + """Without "include" lines.""" return {'backend': {'providers': {'gcp': {'actor-factory': 'GOOGLE'}}}} diff --git a/tests/test_resource_analysis.py b/tests/test_resource_analysis.py index fea2cef5..44ffa475 100644 --- a/tests/test_resource_analysis.py +++ b/tests/test_resource_analysis.py @@ -63,8 +63,7 @@ def test_resource_analysis_analyze_task(gcp_res_analysis_metadata): def test_resource_analysis_analyze(gcp_res_analysis_metadata): - """Test method analyze() which analyze all tasks defined in in_file_vars. - """ + """Test method analyze() which analyze all tasks defined in in_file_vars.""" analysis = LinearResourceAnalysis() analysis.collect_resource_data([gcp_res_analysis_metadata]) diff --git a/tests/test_wdl_parser.py b/tests/test_wdl_parser.py index 5fd1abb0..e5301cde 100644 --- a/tests/test_wdl_parser.py +++ b/tests/test_wdl_parser.py @@ -23,10 +23,10 @@ def test_properties(tmp_path): """Test the following properties. - - contents - - workflow_meta - - workflow_parameter_meta - - imports + - contents + - workflow_meta + - workflow_parameter_meta + - imports """ wdl = tmp_path / 'main.wdl' wdl.write_text(MAIN_WDL)