Skip to content

Commit

Permalink
Merge pull request #2 from AmandaBirmingham/master
Browse files Browse the repository at this point in the history
Charlie's changes from master
  • Loading branch information
AmandaBirmingham authored Oct 4, 2024
2 parents d39ac10 + d827239 commit 22791b8
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 95 deletions.
206 changes: 123 additions & 83 deletions sequence_processing_pipeline/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@


class Job:
slurm_status_terminated = ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED',
'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED',
'REVOKED', 'TIMEOUT']

slurm_status_successful = ['COMPLETED']

slurm_status_running = ['COMPLETING', 'CONFIGURING', 'PENDING', 'REQUEUED',
'REQUEUE_FED', 'REQUEUE_HOLD', 'RESIZING',
'RESV_DEL_HOLD', 'RUNNING', 'SIGNALING',
'SPECIAL_EXIT', 'STAGE_OUT', 'STOPPED',
'SUSPENDED']

slurm_status_not_running = (slurm_status_terminated +
slurm_status_successful)

slurm_status_all_states = (slurm_status_terminated +
slurm_status_successful +
slurm_status_running)

polling_interval_in_seconds = 60

def __init__(self, root_dir, output_path, job_name, executable_paths,
max_array_length, modules_to_load=None):
"""
Expand Down Expand Up @@ -191,20 +212,99 @@ def _system_call(self, cmd, allow_return_codes=[], callback=None):

return {'stdout': stdout, 'stderr': stderr, 'return_code': return_code}

def wait_on_job_ids(self, job_ids, callback=None):
'''
Wait for the given job-ids to finish running before returning.
:param job_ids: A list of Slurm job-ids
:param callback: Set callback function that receives status updates.
:return: A dictionary of job-ids and their current statuses.
'''

# wait_on_job_ids was broken out of submit_job() and updated to monitor
# multiple job ids. This will allow multiple jobs to be submitted to
# Slurm in parallel and a single wait_on_job_ids() can wait on all of
# them before returning, optionally submitting callbacks for each
# job-id.

# ensure all ids are strings to ensure proper working w/join().
job_ids = [str(x) for x in job_ids]

def query_slurm(job_ids):
# internal function query_slurm encapsulates the handling of
# squeue.
count = 0
while True:
result = self._system_call("squeue -t all -j "
f"{','.join(job_ids)} "
"-o '%F,%A,%T'")

if result['return_code'] == 0:
# there was no issue w/squeue, break this loop and
# continue.
break
else:
# there was a likely intermittent issue w/squeue. Pause
# and wait before trying a few more times. If the problem
# persists then report the error and exit.
count += 1

if count > 3:
raise ExecFailedError(result['stderr'])

sleep(60)

lines = result['stdout'].split('\n')
lines.pop(0) # remove header
lines = [x.split(',') for x in lines if x != '']

jobs = {}
child_jobs = {}
for job_id, unique_id, state in lines:
jobs[unique_id] = state

if unique_id != job_id:
child_jobs[unique_id] = job_id # job is a child job

return jobs, child_jobs

while True:
jobs, child_jobs = query_slurm(job_ids)

for jid in job_ids:
logging.debug("JOB %s: %s" % (jid, jobs[jid]))
if callback is not None:
callback(jid=jid, status=jobs[jid])

children = [x for x in child_jobs if child_jobs[x] == jid]
if len(children) == 0:
logging.debug("\tNO CHILDREN")
for cid in children:
logging.debug("\tCHILD JOB %s: %s" % (cid, jobs[cid]))
status = [jobs[x] in Job.slurm_status_not_running for x in job_ids]

if set(status) == {True}:
# all jobs either completed successfully or terminated.
break

sleep(Job.polling_interval_in_seconds)

return jobs

def submit_job(self, script_path, job_parameters=None,
script_parameters=None, wait=True,
exec_from=None, callback=None):
"""
Submit a Torque job script and optionally wait for it to finish.
:param script_path: The path to a Torque job (bash) script.
Submit a Slurm job script and optionally wait for it to finish.
:param script_path: The path to a Slurm job (bash) script.
:param job_parameters: Optional parameters for scheduler submission.
:param script_parameters: Optional parameters for your job script.
:param wait: Set to False to submit job and not wait.
:param exec_from: Set working directory to execute command from.
:param callback: Set callback function that receives status updates.
:return: Dictionary containing the job's id, name, status, and
elapsed time. Raises PipelineError if job could not be submitted or
if job was unsuccessful.
:return: If wait is True, a dictionary containing the job's id and
status. If wait is False, the Slurm job-id of the submitted
job. Raises PipelineError if job could not be submitted or if
job was unsuccessful.
"""
if job_parameters:
cmd = 'sbatch %s %s' % (job_parameters, script_path)
Expand All @@ -230,95 +330,35 @@ def submit_job(self, script_path, job_parameters=None,

job_id = stdout.strip().split()[-1]

job_info = {'job_id': None, 'job_name': None, 'job_state': None,
'elapsed_time': None}
# Just to give some time for everything to be set up properly
sleep(10)

exit_count = 0

while wait:
result = self._system_call(f"sacct -P -n --job {job_id} --format "
"JobID,JobName,State,Elapsed,ExitCode")

if result['return_code'] != 0:
# sacct did not successfully submit the job.
raise ExecFailedError(result['stderr'])

# [-1] remove the extra \n
jobs_data = result['stdout'].split('\n')[:-1]
states = dict()
estatuses = dict()
for i, jd in enumerate(jobs_data):
jid, jname, jstate, etime, estatus = jd.split('|')
if jid.endswith('.extern') or jid.endswith('.batch'):
continue

if i == 0:
job_info['job_id'] = jid
job_info['job_name'] = jname
job_info['elapsed_time'] = etime
job_info['exit_status'] = estatus

if jstate not in states:
states[jstate] = 0
states[jstate] += 1

if estatus not in estatuses:
estatuses[estatus] = 0
estatuses[estatus] += 1
if wait is False:
# return job_id since that is the only information for this new
# job that we have available. User should expect that this is
# not a dict if they explicitly set wait=False.
return job_id

job_info['job_state'] = f'{states}'
job_info['exit_status'] = f'{estatuses}'
# the user is expecting a dict with 'job_id' and 'job_state'
# attributes. This method will return a dict w/job_ids as keys and
# their job status as values. This must be munged before returning
# to the user.
results = self.wait_on_job_ids([job_id], callback=callback)

if callback is not None:
callback(jid=job_id, status=f'{states}')

logging.debug("Job info: %s" % job_info)

# if job is completed after having run or exited after having
# run, then stop waiting.
if not set(states) - {'COMPLETED', 'FAILED', 'CANCELLED'}:
# break
exit_count += 1
job_result = {'job_id': job_id, 'job_state': results[job_id]}

if exit_count > 4:
break

sleep(10)
if callback is not None:
callback(jid=job_id, status=job_result['job_state'])

if job_info['job_id'] is not None:
# job was once in the queue
if callback is not None:
callback(jid=job_id, status=job_info['job_state'])

if set(states) == {'COMPLETED'}:
if 'exit_status' in job_info:
if set(estatuses) == {'0:0'}:
# job completed successfully
return job_info
else:
exit_status = job_info['exit_status']
raise JobFailedError(f"job {job_id} exited with exit_"
f"status {exit_status}")
else:
# with no other info, assume job completed successfully
return job_info
else:
# job exited unsuccessfully
raise JobFailedError(f"job {job_id} exited with status "
f"{job_info['job_state']}")
if job_result['job_state'] == 'COMPLETED':
return job_result
else:
# job was never in the queue - return an error.
if callback is not None:
callback(jid=job_id, status='ERROR')

raise JobFailedError(f"job {job_id} never appeared in the "
"queue.")
raise JobFailedError(f"job {job_id} exited with status "
f"{job_result['job_state']}")

def _group_commands(self, cmds):
# break list of commands into chunks of max_array_length (Typically
# 1000 for Torque job arrays). To ensure job arrays are never more
# 1000 for Slurm job arrays). To ensure job arrays are never more
# than 1000 jobs long, we'll chain additional commands together, and
# evenly distribute them amongst the first 1000.
cmds.sort()
Expand Down
19 changes: 16 additions & 3 deletions sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
wall_time_limit, jmem, fastp_path, minimap2_path,
samtools_path, modules_to_load, qiita_job_id,
max_array_length, known_adapters_path, movi_path, gres_value,
pmls_path, bucket_size=8, length_limit=100, cores_per_task=4):
pmls_path, additional_fastq_tags, bucket_size=8,
length_limit=100, cores_per_task=4):
"""
Submit a slurm job where the contents of fastq_root_dir are processed
using fastp, minimap2, and samtools. Human-genome sequences will be
Expand All @@ -69,6 +70,8 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
:param bucket_size: the size in GB of each bucket to process
:param length_limit: reads shorter than this will be discarded.
:param cores_per_task: Number of CPU cores per node to request.
:param additional_fastq_tags: A list of fastq tags to preserve during
filtering.
"""
super().__init__(fastq_root_dir,
output_path,
Expand Down Expand Up @@ -96,6 +99,7 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
self.movi_path = movi_path
self.gres_value = gres_value
self.pmls_path = pmls_path
self.additional_fastq_tags = additional_fastq_tags

# for projects that use sequence_processing_pipeline as a dependency,
# jinja_env must be set to sequence_processing_pipeline's root path,
Expand Down Expand Up @@ -401,6 +405,14 @@ def _generate_mmi_filter_cmds(self, working_dir):

cores_to_allocate = int(self.cores_per_task / 2)

if len(self.additional_fastq_tags) > 0:
# add tags for known metadata types that fastq files may have
# been annotated with. Samtools will safely ignore tags that
# are not present.
tags = " -T %s" % ','.join(self.additional_fastq_tags)
else:
tags = ""

for count, mmi_db_path in enumerate(self.mmi_file_paths):
if count == 0:
# prime initial state with unfiltered file and create first of
Expand All @@ -416,9 +428,10 @@ def _generate_mmi_filter_cmds(self, working_dir):
input = tmp_file1
output = tmp_file2

cmds.append(f"minimap2 -2 -ax sr -t {cores_to_allocate} "
cmds.append(f"minimap2 -2 -ax sr -y -t {cores_to_allocate} "
f"{mmi_db_path} {input} -a | samtools fastq -@ "
f"{cores_to_allocate} -f 12 -F 256 > {output}")
f"{cores_to_allocate} -f 12 -F 256{tags} > "
f"{output}")

# rename the latest tmp file to the final output filename.
cmds.append(f"mv {output} {final_output}")
Expand Down
39 changes: 38 additions & 1 deletion sequence_processing_pipeline/Pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from collections import defaultdict
from datetime import datetime
from xml.etree import ElementTree as ET
from metapool.prep import PREP_MF_COLUMNS


logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
Expand Down Expand Up @@ -235,6 +236,38 @@ def __init__(self, configuration_file_path, run_id, sample_sheet_path,

self._configure_profile()

def identify_reserved_words(self, words):
'''
Returns a list of words that should not appear as column names in any
project referenced in the Pipeline's sample-sheet/pre-prep file.
:param words: A list of words that may include reserved words.
:return: A list of words that are already reserved in upper, lower,
and mixed cases.
'''

# Only strings used as column names in pre-prep files are currently
# considered 'reserved' as loading a pre-prep file containing these
# column names will fail if one or more of the strings already appears
# as a column name in a study's sample metadata table.

# This implementation assumes some understanding of metapool's impl,
# specifically how the proper set of prep-info file columns are
# generated. For now the functionality will be defined here as this
# area of metapool is currently in flux.
if self.mapping_file is not None:
reserved = PREP_MF_COLUMNS
else:
# results will be dependent on SheetType and SheetVersion of
# the sample-sheet. Since all columns in a prep-info file are
# lower()ed before writing out to file, the word must be
# reserved in all case forms. e.g.: 'Sample_Well' and 'Sample_well'
# are both forms of 'sample_well'.
reserved = [x.lower() for x in
self.sample_sheet.CARRIED_PREP_COLUMNS] + \
self.sample_sheet.GENERATED_PREP_COLUMNS

return list(set([x.lower() for x in words]) & set(reserved))

def _configure_profile(self):
# extract the instrument type from self.run_dir and the assay type
# from self.sample_sheet (or self.mapping_file).
Expand Down Expand Up @@ -723,10 +756,14 @@ def get_project_info(self, short_names=False):
for res in bioinformatics.to_dict('records'):
p_name, q_id = self._parse_project_name(res['Sample_Project'],
short_names)

# parsed SampleSheet() objects should now contain only
# boolean values in contains_replicates column.

contains_replicates = False

if 'contains_replicates' in res:
if res['contains_replicates'] == 'True':
if res['contains_replicates'] is True:
contains_replicates = True

results.append({'project_name': p_name,
Expand Down
Loading

0 comments on commit 22791b8

Please sign in to comment.