Skip to content

Commit

Permalink
add test truncate
Browse files Browse the repository at this point in the history
  • Loading branch information
thanhleviet committed Jul 1, 2024
1 parent 98a29de commit ab4dc63
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 42 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# q2-usearch
[![Test and lint](https://github.com/quadram-institute-bioscience/q2-usearch/actions/workflows/ci.yml/badge.svg)](https://github.com/quadram-institute-bioscience/q2-usearch/actions/workflows/ci.yml)

A [QIIME 2](https://qiime2.org) plugin [developed](https://develop.qiime2.org) by Core Bioinformatics, QIB.

Expand Down
80 changes: 41 additions & 39 deletions q2_usearch/_fastqx.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
FastqManifestFormat, YamlFormat)


from ._utils import run_command, _fasta_with_sizes, _error_on_nonoverlapping_ids
from ._utils import run_command, validate_params
import logging as logger

logger.basicConfig(level=logger.DEBUG, format='%(asctime)s - %(message)s')
Expand Down Expand Up @@ -90,12 +90,13 @@ def fastx_uniques(sequences: QIIME1DemuxDirFmt,

def fastx_truncate(
unique_seqs: SingleLanePerSampleSingleEndFastqDirFmt,
trunclen: int = 150,
trunclen: int = 200,
stripleft: int = 0,
stripright: int = 0,
padlen: int = 100,
padlen: int = 200,
relabel: bool = False,
) -> (SingleLanePerSampleSingleEndFastqDirFmt): # type: ignore
validate_params([trunclen, stripleft, stripright, padlen])
truncated_seqs = SingleLanePerSampleSingleEndFastqDirFmt()

# Read the manifest file
Expand All @@ -104,6 +105,7 @@ def fastx_truncate(
header=0,
comment="#",
)
# logger.info(f"MANIFEST: {manifest}")

# Update file paths in manifest
manifest["filename"] = manifest["filename"].apply(
Expand All @@ -119,7 +121,7 @@ def fastx_truncate(
missing_files = []

# Process each sample
for _, row in manifest.iterrows():
for i, row in manifest.iterrows():
sample_id = row["sample-id"]
input_fp = row["filename"]

Expand All @@ -137,16 +139,22 @@ def fastx_truncate(
else:
uncompressed_fp = input_fp

gz_truncated_path, fq_truncated_path = _get_output_paths(
truncated_seqs, sample_id, i, 1
)

# Generate output path
output_fp = os.path.join(temp_dir, f"{sample_id}_truncated.fastq")
# _output_file_name = f"{sample_id}_L001_R1_001.fastq.gz"
_output_file_name = os.path.basename(fq_truncated_path)
output_fp = os.path.join(temp_dir, _output_file_name)

# Prepare USEARCH command arguments
cmd = [
"usearch",
"-fastx_truncate",
uncompressed_fp,
"-fastqout",
output_fp,
fq_truncated_path,
]

if trunclen is not None:
Expand All @@ -162,22 +170,29 @@ def fastx_truncate(

# Process reads
run_command(cmd)

# Check if the output file exists and is not empty
if os.path.exists(output_fp) and os.path.getsize(output_fp) > 0:
if (
os.path.exists(fq_truncated_path)
and os.path.getsize(fq_truncated_path) > 0
):
# Compress output file
final_output_fp = os.path.join(
str(truncated_seqs), f"{sample_id}_truncated.fastq.gz"
)
with open(output_fp, "rb") as f_in:
with gzip.open(final_output_fp, "wb") as f_out:
final_output_fp = os.path.join(str(truncated_seqs), _output_file_name)
with open(fq_truncated_path, "rb") as f_in:
with gzip.open(gz_truncated_path, "wb") as f_out:
logger.info(f"Copying {output_fp} to {final_output_fp}")
shutil.copyfileobj(f_in, f_out)

os.remove(fq_truncated_path)
logger.info(f"Output file created: {final_output_fp}")
logger.info(
f"{sample_id},{os.path.basename(final_output_fp)},forward\n"
)
# Update manifest
# Bear in mind the fasqt file is copied into a gzipped file
truncated_manifest_fh.write(
f"{sample_id},{os.path.basename(final_output_fp)},forward\n"
f"{sample_id},{os.path.basename(final_output_fp)}.gz,forward\n"
)
else:
logger.info("No sample passed!")
# Record missing file
missing_files.append(
{
Expand All @@ -189,7 +204,9 @@ def fastx_truncate(

truncated_manifest_fh.close()
truncated_seqs.manifest.write_data(truncated_manifest, FastqManifestFormat)

logger.info(f"MANIFEST: {truncated_manifest.path}")
with open(truncated_manifest.path, "r") as f:
logger.info(f.read())
# Copy metadata
metadata = YamlFormat()
with open(os.path.join(str(unique_seqs), unique_seqs.metadata.pathspec), "r") as f:
Expand Down Expand Up @@ -321,34 +338,19 @@ def _merge_pairs_w_command_output(
): # type: ignore
# create formats
"""
Merges paired-end reads from a demultiplexed sequence input.
This function merges paired-end reads using the USEARCH algorithm, allowing for
various parameters to be specified that control the merging process. It handles
both the merging of reads that overlap and the retention of reads that do not
merge. The function also compresses the output reads and generates manifest files
for both merged and unmerged reads.
Merges paired-end reads from demultiplexed sequences using USEARCH.
Parameters:
- demultiplexed_seqs (SingleLanePerSamplePairedEndFastqDirFmt): The input demultiplexed sequences.
- maxdiffs (int): The maximum number of differences allowed in the overlap region.
- pctid (int): The minimum percentage identity required in the overlap.
- nostagger (bool): If True, staggered reads will not be merged.
- minmergelen (int): The minimum length of merged reads to be retained.
- maxmergelen (int): The maximum length of merged reads to be retained.
- minqual (int): The minimum quality score to keep a base during merging.
- minovlen (int): The minimum overlap length required to merge reads.
- relabel (str): The character to use for relabeling the merged reads.
- threads (int): The number of threads to use for the merging process.
- demultiplexed_seqs (SingleLanePerSamplePairedEndFastqDirFmt): Input sequences.
- maxdiffs, pctid, minmergelen, maxmergelen, minqual, minovlen (int): Parameters controlling merging.
- nostagger (bool): If True, excludes staggered reads from merging.
- relabel (str): Character for relabeling merged reads.
- threads (int): Number of threads for merging.
Returns:
- A tuple containing the command used for merging, the directory format object for merged reads,
and the directory format object for unmerged reads.
- Tuple with the USEARCH command, directory format objects for merged and unmerged reads.
The function first initializes the output formats and manifest files, then iterates over each
sample to merge the reads. It constructs and executes a USEARCH command for each sample, handling
both merged and unmerged outputs. Finally, it writes the manifest files and updates the metadata
for both merged and unmerged reads.
Initializes output formats, merges reads per sample with USEARCH, handles outputs, and writes manifest files.
"""
merged = SingleLanePerSampleSingleEndFastqDirFmt()
unmerged = SingleLanePerSamplePairedEndFastqDirFmt()
Expand Down
8 changes: 5 additions & 3 deletions q2_usearch/plugin_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,18 @@

plugin.methods.register_function(
function=q2_usearch._fastqx.fastx_truncate,
inputs={"unique_seqs": SequencesWithQuality},
inputs={
"unique_seqs": SampleData[SequencesWithQuality]
| SampleData[JoinedSequencesWithQuality]
},
parameters={
"trunclen": qiime2.plugin.Int % qiime2.plugin.Range(1, None),
"stripleft": qiime2.plugin.Int % qiime2.plugin.Range(0, None),
"stripright": qiime2.plugin.Int % qiime2.plugin.Range(0, None),
"padlen": qiime2.plugin.Int % qiime2.plugin.Range(1, None),
"relabel": qiime2.plugin.Bool,

},
outputs=[("truncated_seqs", SequencesWithQuality)],
outputs=[("truncated_seqs", SampleData[SequencesWithQuality])],
input_descriptions={
"unique_seqs": "The single-end demultiplexed sequences to be truncated. Input sequences should be the output of mergepairs step."
},
Expand Down
149 changes: 149 additions & 0 deletions q2_usearch/tests/test_fastx_truncate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import os
import pandas as pd
import unittest
import tempfile
import subprocess
import filecmp
from unittest.mock import patch
from qiime2.plugin.testing import TestPluginBase
from qiime2 import Artifact
from q2_types.per_sample_sequences import SingleLanePerSampleSingleEndFastqDirFmt
from q2_usearch._fastqx import fastx_truncate
from q2_usearch._utils import USearchError

import logging as logger
import gzip
logger.basicConfig(level=logger.DEBUG)

class TestFastxTruncate(TestPluginBase):
package = "q2_usearch.tests"

def setUp(self):
super().setUp()

# Load the real test file
sequences_fp = self.get_data_path("merged_sequences.qza")

# Import the file as a QIIME 2 Artifact
sequences_artifact = Artifact.load(sequences_fp)

# Get the SingleLanePerSampleSingleEndFastqDirFmt representation
self.sequences = sequences_artifact.view(
SingleLanePerSampleSingleEndFastqDirFmt
)

# @patch("q2_usearch._fastqx.run_command")
def test_fastx_truncate_default_params(self):
# Test with default parameters
truncated_seqs = fastx_truncate(self.sequences)
# Check if the returned object is of the correct type
self.assertIsInstance(truncated_seqs, SingleLanePerSampleSingleEndFastqDirFmt)

def test_fastx_truncate_custom_params(self):
# Test with custom parameters
truncated_seqs = fastx_truncate(
self.sequences,
trunclen=100,
stripleft=10,
stripright=5,
padlen=150,
relabel=True,
)

# Check if the returned object is of the correct type
self.assertIsInstance(truncated_seqs, SingleLanePerSampleSingleEndFastqDirFmt)

# Check the content of the truncated sequences manifest
manifest_path = os.path.join(
str(truncated_seqs), truncated_seqs.manifest.pathspec
)
logger.debug(f"Manifest path: {manifest_path}")

manifest = pd.read_csv(manifest_path, header=0, comment="#")
self.assertGreater(len(manifest), 0, "Truncated sequences manifest is empty")
logger.debug(f"Manifest content:\n{manifest}")
# Check if at least one truncated sequence file exists and is not empty
truncated_file = os.path.join(str(truncated_seqs), manifest.iloc[0]["filename"])
logger.debug(f"Truncated file path: {truncated_file}")

self.assertTrue(
os.path.exists(truncated_file),
f"Truncated sequence file does not exist: {truncated_file}",
)
self.assertGreater(
os.path.getsize(truncated_file),
0,
f"Truncated sequence file is empty: {truncated_file}",
)
sample_id = manifest.iloc[0]["sample-id"]
# Check if the sequences are actually truncated and processed according to the parameters
try:
with gzip.open(truncated_file, "rt") as f:
for line in f:
if line.startswith("@"): # FASTQ header
self.assertTrue(
sample_id in line,
f"Relabeling not applied: {line}",
)
continue
if line.startswith("+"): # Quality score identifier
break
self.assertLessEqual(
len(line.strip()),
100,
f"Sequence is longer than specified truncation length: {line}",
)
except gzip.BadGzipFile:
logger.error(f"File is not a valid gzip file: {truncated_file}")
raise
except Exception as e:
logger.error(f"Error reading file {truncated_file}: {str(e)}")
raise

logger.info("Test completed successfully")

@patch("q2_usearch._fastqx.run_command")
def test_fastx_truncate_error_handling(self, mock_run_command):
# Simulate an error in the USEARCH command
mock_run_command.side_effect = USearchError("USEARCH command failed")
# Test with invalid parameters to trigger an error
with self.assertRaises(USearchError):
fastx_truncate(self.sequences) # Invalid truncation length

def test_fastx_truncate_output_content(self):
# Run the function with real data
truncated_seqs = fastx_truncate(self.sequences, trunclen=100)

# Check the content of the truncated sequences manifest
manifest = pd.read_csv(
os.path.join(str(truncated_seqs), truncated_seqs.manifest.pathspec),
header=0,
comment="#",
)
self.assertGreater(len(manifest), 0, "Truncated sequences manifest is empty")

# Check if at least one truncated sequence file exists and is not empty
truncated_file = os.path.join(str(truncated_seqs), manifest.iloc[0]["filename"])
self.assertTrue(
os.path.exists(truncated_file), "Truncated sequence file does not exist"
)
self.assertGreater(
os.path.getsize(truncated_file), 0, "Truncated sequence file is empty"
)

# Check if the sequences are actually truncated
with gzip.open(truncated_file, "r") as f:
for line in f:
if line.startswith(b"@"): # FASTQ header
continue
if line.startswith(b"+"): # Quality score identifier
break
self.assertLessEqual(
len(line.strip()),
100,
"Sequence is longer than specified truncation length",
)


if __name__ == "__main__":
unittest.main()

0 comments on commit ab4dc63

Please sign in to comment.