Skip to content

Commit

Permalink
Merge pull request #141 from charles-cowart/movi_day
Browse files Browse the repository at this point in the history
Updates from production, Support for new human-filtering impl
  • Loading branch information
charles-cowart authored Jun 13, 2024
2 parents 04d7af0 + de6b479 commit b9bdc00
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 2,450 deletions.
3 changes: 0 additions & 3 deletions sequence_processing_pipeline/FastQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ def _find_projects(self, path_to_run_id_data_fastq_dir, is_raw_input):
i1_only = [x for x in files if '_I1_' in x]
i2_only = [x for x in files if '_I2_' in x]

if not self.is_amplicon and len(i1_only) != len(i2_only):
raise PipelineError('counts of I1 and I2 files do not match')

if r1_only:
tmp = ' '.join(r1_only)
if 'trimmed_sequences' in tmp:
Expand Down
2 changes: 1 addition & 1 deletion sequence_processing_pipeline/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def extract_project_names_from_fastq_dir(self, path_to_fastq_dir):
# Hence, this method filters for names that fit the above format.
results = []
for dir_name in tmp:
m = re.match(r'^(\w+_\d{5})$', dir_name)
m = re.match(r'^(\w[\w\-_]*_\d{5})$', dir_name)
if m:
results.append(m.groups(0)[0])

Expand Down
89 changes: 61 additions & 28 deletions sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
minimap_database_paths, queue_name, node_count,
wall_time_limit, jmem, fastp_path, minimap2_path,
samtools_path, modules_to_load, qiita_job_id,
max_array_length, known_adapters_path, bucket_size=8,
length_limit=100, cores_per_task=4):
max_array_length, known_adapters_path, movi_path, gres_value,
pmls_path, bucket_size=8, length_limit=100, cores_per_task=4):
"""
Submit a slurm job where the contents of fastq_root_dir are processed
using fastp, minimap2, and samtools. Human-genome sequences will be
Expand All @@ -63,6 +63,9 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
:param modules_to_load: A list of Linux module names to load
:param qiita_job_id: identify Torque jobs using qiita_job_id
:param known_adapters_path: The path to an .fna file of known adapters.
:param movi_path: The path to the Movi executable.
:param gres_value: Cluster-related parameter.
:param pmls_path: The path to the pmls script.
:param bucket_size: the size in GB of each bucket to process
:param length_limit: reads shorter than this will be discarded.
:param cores_per_task: Number of CPU cores per node to request.
Expand All @@ -79,17 +82,20 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
self.sample_ids = metadata['sample_ids']
self.project_data = metadata['projects']
self.chemistry = metadata['chemistry']
self.minimap_database_paths = minimap_database_paths
self.mmi_file_paths = minimap_database_paths
self.queue_name = queue_name
self.node_count = node_count
self.wall_time_limit = wall_time_limit
# raise an Error if jmem is not a valid floating point value.
self.jmem = float(jmem)
self.jmem = str(int(jmem))
self.fastp_path = fastp_path
self.minimap2_path = minimap2_path
self.samtools_path = samtools_path
self.qiita_job_id = qiita_job_id
self.suffix = 'fastq.gz'
self.movi_path = movi_path
self.gres_value = gres_value
self.pmls_path = pmls_path

# for projects that use sequence_processing_pipeline as a dependency,
# jinja_env must be set to sequence_processing_pipeline's root path,
Expand Down Expand Up @@ -243,7 +249,7 @@ def run(self, callback=None):

self.counts[self.batch_prefix] = batch_count

export_params = [f"MMI={self.minimap_database_paths}",
export_params = [f"MMI={self.mmi_file_paths}",
f"PREFIX={batch_location}",
f"OUTPUT={self.output_path}",
f"TMPDIR={self.temp_dir}"]
Expand Down Expand Up @@ -396,32 +402,51 @@ def _process_sample_sheet(self):
'projects': lst,
'sample_ids': sample_ids}

def _generate_mmi_filter_cmds(self, working_dir):
initial_input = join(working_dir, "seqs.r1.fastq")
final_output = join(working_dir, "seqs.r1.ALIGN.fastq")

cmds = []

tmp_file1 = join(working_dir, "foo")
tmp_file2 = join(working_dir, "bar")

cores_to_allocate = int(self.cores_per_task / 2)

for count, mmi_db_path in enumerate(self.mmi_file_paths):
if count == 0:
# prime initial state with unfiltered file and create first of
# two tmp files.
input = initial_input
output = tmp_file1
elif count % 2 == 0:
# alternate between two tmp file names so that the input and
# output are never the same file.
input = tmp_file2
output = tmp_file1
else:
input = tmp_file1
output = tmp_file2

cmds.append(f"minimap2 -2 -ax sr -t {cores_to_allocate} "
f"{mmi_db_path} {input} -a | samtools fastq -@ "
f"{cores_to_allocate} -f 12 -F 256 > {output}")

# rename the latest tmp file to the final output filename.
cmds.append(f"mv {output} {final_output}")

# simple cleanup erases tmp files if they exist.
cmds.append(f"[ -e {tmp_file1} ] && rm {tmp_file1}")
cmds.append(f"[ -e {tmp_file2} ] && rm {tmp_file2}")

return "\n".join(cmds)

def _generate_job_script(self, max_bucket_size):
# bypass generating job script for a force-fail job, since it is
# not needed.
if self.force_job_fail:
return None

gigabyte = 1073741824

if max_bucket_size > (3 * gigabyte):
gres_value = 4
else:
gres_value = 2

# As slurm expects memory sizes to be integer values, convert results
# to int to drop floating point component before passing to jinja2 as
# a string.
if max_bucket_size < gigabyte:
mod_wall_time_limit = self.wall_time_limit
mod_jmem = str(int(self.jmem))
elif max_bucket_size < (2 * gigabyte):
mod_wall_time_limit = self.wall_time_limit * 1.5
mod_jmem = str(int(self.jmem * 4.5))
else:
mod_wall_time_limit = self.wall_time_limit * 2
mod_jmem = str(int(self.jmem * 7.5))

job_script_path = join(self.output_path, 'process_all_fastq_files.sh')
template = self.jinja_env.get_template("nuqc_job.sh")

Expand All @@ -443,6 +468,11 @@ def _generate_job_script(self, max_bucket_size):
if not exists(splitter_binary):
raise ValueError(f'{splitter_binary} does not exist.')

# this method relies on an environment variable defined in nu_qc.sh
# used to define where unfiltered fastq files are and where temp
# files can be created. (${jobd})
mmi_filter_cmds = self._generate_mmi_filter_cmds("${jobd}")

with open(job_script_path, mode="w", encoding="utf-8") as f:
# the job resources should come from a configuration file

Expand All @@ -453,8 +483,8 @@ def _generate_job_script(self, max_bucket_size):
f.write(template.render(job_name=job_name,
queue_name=self.queue_name,
# should be 4 * 24 * 60 = 4 days
wall_time_limit=mod_wall_time_limit,
mem_in_gb=mod_jmem,
wall_time_limit=self.wall_time_limit,
mem_in_gb=self.jmem,
# Note NuQCJob now maps node_count to
# SLURM -N parameter to act like other
# Job classes.
Expand All @@ -471,7 +501,10 @@ def _generate_job_script(self, max_bucket_size):
splitter_binary=splitter_binary,
modules_to_load=mtl,
length_limit=self.length_limit,
gres_value=gres_value))
gres_value=self.gres_value,
movi_path=self.movi_path,
mmi_filter_cmds=mmi_filter_cmds,
pmls_path=self.pmls_path))

return job_script_path

Expand Down
9 changes: 8 additions & 1 deletion sequence_processing_pipeline/Pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,15 @@ def get_project_info(self, short_names=False):
for res in bioinformatics.to_dict('records'):
p_name, q_id = self._parse_project_name(res['Sample_Project'],
short_names)
contains_replicates = False

if 'contains_replicates' in res:
if res['contains_replicates'] == 'True':
contains_replicates = True

results.append({'project_name': p_name,
'qiita_id': q_id})
'qiita_id': q_id,
'contains_replicates': contains_replicates})

return results

Expand Down
Loading

0 comments on commit b9bdc00

Please sign in to comment.