From 618e082d5be4cf40dab8840506220fa45dfa4556 Mon Sep 17 00:00:00 2001 From: deliaBlue <103108590+deliaBlue@users.noreply.github.com> Date: Sun, 3 Dec 2023 23:03:00 +0100 Subject: [PATCH] style: reformat unit test modules (#125) * style: refactor unit test modules * refactor: use relative path for module import * build: add __init__.py * ci: modify pytest and flake8 calls * ci: add black call * style: correct style with black --- .github/workflows/tests.yml | 10 +- scripts/__init__.py | 1 + scripts/filter_multimappers.py | 57 ++- scripts/iso_name_tagging.py | 117 +++--- scripts/mirna_extension.py | 115 +++--- scripts/mirna_quantification.py | 189 +++++----- scripts/nh_filter.py | 18 +- scripts/oligomap_output_to_sam_nh_filtered.py | 129 ++++--- scripts/primir_quantification.py | 123 +++--- scripts/tests/__init__.py | 1 + scripts/tests/test_filter_multimappers.py | 152 ++++---- scripts/tests/test_iso_name_tagging.py | 172 +++++---- scripts/tests/test_mirna_extension.py | 242 ++++++------ scripts/tests/test_mirna_quantification.py | 352 +++++++++++------- ...test_oligomap_output_to_sam_nh_filtered.py | 318 +++++++++++----- scripts/tests/test_primir_quantification.py | 196 +++++----- scripts/validation_fasta.py | 135 +++---- 17 files changed, 1350 insertions(+), 977 deletions(-) create mode 100644 scripts/__init__.py create mode 100644 scripts/tests/__init__.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 48137d3b..bd81f5fd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -38,7 +38,7 @@ jobs: - name: flake8 working-directory: ./scripts - run: flake8 ./*.py + run: flake8 - name: mypy working-directory: ./scripts @@ -48,6 +48,12 @@ jobs: working-directory: ./scripts run: pylint --rcfile=../pylint.cfg ./*.py + - name: black + uses: psf/black@stable + with: + options: "--check --verbose --line-length=79" + src: "./scripts" + snakemake-format-graph-test: runs-on: ubuntu-latest @@ -149,4 +155,4 @@ jobs: - name: run unit tests working-directory: ./scripts/tests - run: pytest --cov=scripts --cov-branch --cov-report=term-missing + run: pytest --import-mode prepend --cov=scripts --cov-branch --cov-report=term-missing diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 00000000..c9fb7ceb --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1 @@ +"""MIRFLOWZ scripts.""" diff --git a/scripts/filter_multimappers.py b/scripts/filter_multimappers.py index 56268fe4..3dd5f8e8 100755 --- a/scripts/filter_multimappers.py +++ b/scripts/filter_multimappers.py @@ -50,29 +50,30 @@ def parse_arguments(): """Command-line arguments parser.""" parser = argparse.ArgumentParser( description="Script to filter multimappers by indel counts." - ) + ) parser.add_argument( - '-v', '--version', - action='version', - version='%(prog)s 1.0', - help="Show program's version number and exit" - ) + "-v", + "--version", + action="version", + version="%(prog)s 1.0", + help="Show program's version number and exit", + ) parser.add_argument( - 'infile', + "infile", help="Path to the SAM input file, sorted by query name.", - type=Path - ) + type=Path, + ) parser.add_argument( - '--nh', + "--nh", help=( "If set, the NH tag will be include in the alignment name after" "and underscore. Default: %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) return parser @@ -101,7 +102,7 @@ def count_indels(aln: pysam.libcalignedsegment.AlignedSegment) -> int: def find_best_alignments( - alns: List[pysam.AlignedSegment], nh: bool = False + alns: List[pysam.AlignedSegment], nh: bool = False ) -> List[pysam.AlignedSegment]: """Find alignments with less indels. @@ -129,20 +130,18 @@ def find_best_alignments( aln_indels = [(aln, count_indels(aln=aln)) for aln in alns] min_indels = min(aln_indels, key=lambda x: x[1])[1] best_alignments = [ - aln - for i, (aln, indels) in enumerate(aln_indels) - if indels == min_indels] + aln + for i, (aln, indels) in enumerate(aln_indels) + if indels == min_indels + ] for i, best_aln in enumerate(best_alignments): - if nh: - name = ( - f'{best_aln.query_name}_{len(best_alignments)}' - ) + name = f"{best_aln.query_name}_{len(best_alignments)}" best_aln.query_name = name - best_aln.set_tag('NH', len(best_alignments)) - best_aln.set_tag('HI', i + 1) + best_aln.set_tag("NH", len(best_alignments)) + best_aln.set_tag("HI", i + 1) return best_alignments @@ -154,20 +153,18 @@ def write_output(alns: List[pysam.AlignedSegment]) -> None: alignments: alignments with the same query name """ for alignment in alns: - sys.stdout.write(alignment.to_string() + '\n') + sys.stdout.write(alignment.to_string() + "\n") def main(arguments) -> None: """Filter multimappers by indels count.""" with pysam.AlignmentFile(arguments.infile, "r") as samfile: - sys.stdout.write(str(samfile.header)) current_alignments: list[pysam.AlignedSegment] = [] current_query = None for alignment in samfile: - if alignment.is_supplementary: continue @@ -178,16 +175,18 @@ def main(arguments) -> None: current_alignments.append(alignment) else: - current_alignments = find_best_alignments(current_alignments, - arguments.nh) + current_alignments = find_best_alignments( + current_alignments, arguments.nh + ) write_output(alns=current_alignments) current_query = alignment.query_name current_alignments = [alignment] if len(current_alignments) > 0: - current_alignments = find_best_alignments(current_alignments, - arguments.nh) + current_alignments = find_best_alignments( + current_alignments, arguments.nh + ) write_output(alns=current_alignments) diff --git a/scripts/iso_name_tagging.py b/scripts/iso_name_tagging.py index 47c9c66a..b2cb08bb 100755 --- a/scripts/iso_name_tagging.py +++ b/scripts/iso_name_tagging.py @@ -92,47 +92,51 @@ def parse_arguments(): """Command-line arguments parser.""" parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( - '-v', '--version', - action='version', - version='%(prog)s 1.0.0', - help="Show program's version number and exit" + "-v", + "--version", + action="version", + version="%(prog)s 1.0.0", + help="Show program's version number and exit", ) parser.add_argument( - '-b', '--bed', + "-b", + "--bed", help=( "Path to the BED file. This file must be the output of " " a bedtools intersect call with -a being a GFF3 file and" " -b a BAM file." ), type=Path, - required=True + required=True, ) parser.add_argument( - '-s', '--sam', + "-s", + "--sam", help="Path to the SAM input file.", type=Path, - required=True + required=True, ) parser.add_argument( - '-e', '--extension', + "-e", + "--extension", help=( "Number of nucleotides the start and end coordinates of the" " annotated features had been extended. Default: %(default)d." ), default=0, - type=int + type=int, ) parser.add_argument( - '--id', + "--id", help=( "ID used to identify the feature in the name that is added as tag." " The ID must be in lowercase. Default: %(default)s." ), default="name", - type=str + type=str, ) return parser @@ -140,22 +144,21 @@ def parse_arguments(): def attributes_dictionary(attr: str) -> Dict[str, str]: """Create attributes dicctionary.""" - pairs = attr.split(';') + pairs = attr.split(";") - if len(pairs[0].split('=')) == 2: - attr_dict = {p.split('=')[0].lower(): p.split('=')[1] for p in pairs} + if len(pairs[0].split("=")) == 2: + attr_dict = {p.split("=")[0].lower(): p.split("=")[1] for p in pairs} else: attr_dict = { - p.split('"')[0].strip().lower(): p.split('"')[1] - for p in pairs} + p.split('"')[0].strip().lower(): p.split('"')[1] for p in pairs + } return attr_dict def parse_intersect_output( - intersect_file: Path, - ID: str = "name", - extension: int = 0) -> Optional[Dict[Optional[str], list]]: + intersect_file: Path, ID: str = "name", extension: int = 0 +) -> Optional[Dict[Optional[str], list]]: """Parse intersect BED file. Given a BED file generated by intersecting a GFF file (-a) with a BAM file @@ -176,25 +179,39 @@ def parse_intersect_output( adjusted. Defaults to 0. """ intersect_data = defaultdict(list) - Fields = namedtuple('Fields', ("feat_chr", "source", "feat_type", - "feat_start", "feat_end", "feat_score", - "strand", "phase", "feat_attributes", - "read_chr", "read_start", "read_end", - "read_name", "read_score", "read_strand", - "overlap_len")) - - with open(intersect_file, 'r', encoding="utf-8") as bedfile: - for line in bedfile: + Fields = namedtuple( + "Fields", + ( + "feat_chr", + "source", + "feat_type", + "feat_start", + "feat_end", + "feat_score", + "strand", + "phase", + "feat_attributes", + "read_chr", + "read_start", + "read_end", + "read_name", + "read_score", + "read_strand", + "overlap_len", + ), + ) - fields = Fields(*line.strip().split('\t')) + with open(intersect_file, "r", encoding="utf-8") as bedfile: + for line in bedfile: + fields = Fields(*line.strip().split("\t")) miRNA_name = attributes_dictionary(fields.feat_attributes)[ID] miRNA_start = int(fields.feat_start) + extension miRNA_end = int(fields.feat_end) - extension - intersect_data[fields.read_name].append((miRNA_name, - miRNA_start, - miRNA_end)) + intersect_data[fields.read_name].append( + (miRNA_name, miRNA_start, miRNA_end) + ) if not intersect_data: return None @@ -203,10 +220,8 @@ def parse_intersect_output( def get_tags( - intersecting_mirna: list, - alignment: pysam.AlignedSegment, - extension: int - ) -> set: + intersecting_mirna: list, alignment: pysam.AlignedSegment, extension: int +) -> set: """Get tag for alignment. Given an alignment and a list containing the feature name, start position, @@ -233,7 +248,7 @@ def get_tags( set of strings containing the new tag """ cigar = alignment.cigarstring - md = alignment.get_tag('MD') + md = alignment.get_tag("MD") limit = extension + 1 tags = [] @@ -243,19 +258,18 @@ def get_tags( shift_3p = alignment.reference_end - miRNA_end if -limit < shift_5p < limit and -limit < shift_3p < limit: - tags.append(f'{miRNA_name}|{shift_5p}|{shift_3p}|{cigar}|{md}') + tags.append(f"{miRNA_name}|{shift_5p}|{shift_3p}|{cigar}|{md}") return set(tags) def main(arguments) -> None: """Add intersecting feature(s) into a SAM file as a tag.""" - intersect_data = parse_intersect_output(arguments.bed, - arguments.id, - arguments.extension) - - with pysam.AlignmentFile(arguments.sam, 'r') as samfile: + intersect_data = parse_intersect_output( + arguments.bed, arguments.id, arguments.extension + ) + with pysam.AlignmentFile(arguments.sam, "r") as samfile: sys.stdout.write(str(samfile.header)) if intersect_data is None: @@ -265,15 +279,16 @@ def main(arguments) -> None: alignment_id = alignment.query_name intersecting_miRNAs = intersect_data[alignment_id] - tags = get_tags(intersecting_mirna=intersecting_miRNAs, - alignment=alignment, - extension=arguments.extension) + tags = get_tags( + intersecting_mirna=intersecting_miRNAs, + alignment=alignment, + extension=arguments.extension, + ) - alignment.set_tag('YW', ';'.join(tags)) - sys.stdout.write(alignment.to_string() + '\n') + alignment.set_tag("YW", ";".join(tags)) + sys.stdout.write(alignment.to_string() + "\n") if __name__ == "__main__": - args = parse_arguments().parse_args() # pragma: no cover main(args) # pragma: no cover diff --git a/scripts/mirna_extension.py b/scripts/mirna_extension.py index f9c23c24..01303a74 100755 --- a/scripts/mirna_extension.py +++ b/scripts/mirna_extension.py @@ -25,7 +25,7 @@ import gffutils # type: ignore -class MirnaExtension(): +class MirnaExtension: """Class to extend miRNAs start and end coordinates. Attributes: @@ -55,18 +55,21 @@ def load_gff_file(self, gff_file: Optional[Path] = None) -> None: standard input (stdin). """ if gff_file is None: - self.db = gffutils.create_db(sys.stdin, dbfn=':memory:', - force=True, keep_order=True) + self.db = gffutils.create_db( + sys.stdin, dbfn=":memory:", force=True, keep_order=True + ) else: - self.db = gffutils.create_db(str(gff_file), dbfn=':memory:', - force=True, keep_order=True) + self.db = gffutils.create_db( + str(gff_file), dbfn=":memory:", force=True, keep_order=True + ) def extend_mirnas( - self, - primir_out: Path, - mir_out: Path, - n: int = 6, - seq_lengths: Optional[dict[str, int]] = None) -> None: + self, + primir_out: Path, + mir_out: Path, + n: int = 6, + seq_lengths: Optional[dict[str, int]] = None, + ) -> None: """Extend miRNAs start and end coordinates. This method elongates the start and end coordinates of mature miRNAs @@ -94,26 +97,31 @@ def extend_mirnas( seq_lengths = {} for seqid in self.db.seqids(): seq_lengths[seqid] = max( - rec.end - for rec in self.db.region(seqid)) - - with (open(primir_out, 'w', encoding="utf-8") as primir, - open(mir_out, 'w', encoding="utf-8") as mirna): - - for primary_mirna in ( - self.db.features_of_type('miRNA_primary_transcript')): + rec.end for rec in self.db.region(seqid) + ) + + with ( + open(primir_out, "w", encoding="utf-8") as primir, + open(mir_out, "w", encoding="utf-8") as mirna, + ): + for primary_mirna in self.db.features_of_type( + "miRNA_primary_transcript" + ): seqid = primary_mirna.seqid start = int(primary_mirna.start) end = int(primary_mirna.end) - mature_miRNAs = list(self.db.region(seqid=seqid, - start=start, - end=end, - featuretype='miRNA', - completely_within=True)) + mature_miRNAs = list( + self.db.region( + seqid=seqid, + start=start, + end=end, + featuretype="miRNA", + completely_within=True, + ) + ) if mature_miRNAs: - for mir in mature_miRNAs: if mir.start - n > 0: mir.start -= n @@ -131,53 +139,55 @@ def extend_mirnas( if mir.end > end: primary_mirna.end = mir.end - mirna.write(str(mir) + '\n') + mirna.write(str(mir) + "\n") start_diff = start - primary_mirna.start end_diff = primary_mirna.end - end primary_mirna.attributes["Name"][0] += f"_-{start_diff}" primary_mirna.attributes["Name"][0] += f"_+{end_diff}" - primir.write(str(primary_mirna) + '\n') + primir.write(str(primary_mirna) + "\n") def parse_arguments(): """Command-line arguments parser.""" parser = argparse.ArgumentParser( description="Script to extend miRNAs start and end coordinates." - ) + ) parser.add_argument( - '-v', '--version', - action='version', - version='%(prog)s 1.0', - help="Show program's version number and exit" + "-v", + "--version", + action="version", + version="%(prog)s 1.0", + help="Show program's version number and exit", ) parser.add_argument( - 'input', + "input", help="Path to the GFF3 annotation file. If not provided, the input \ will be read from the standard input.", - type=Path + type=Path, ) parser.add_argument( - '--outdir', + "--outdir", help="Path to the output directory. Default: %(default)s.", default=Path.cwd(), - type=Path + type=Path, ) parser.add_argument( - '-e', '--extension', + "-e", + "--extension", help="Number of nucleotides to extend the coordinates. Default: \ %(default)d.", default=6, - type=int + type=int, ) parser.add_argument( - '--chr', + "--chr", help="Path to the tabulated file containing the chromosome and its \ length in basepairs. If not provided, the length will be set to \ the biggest coordinate of the last miRNA primary transcript.", default=None, - type=Path + type=Path, ) return parser @@ -188,15 +198,19 @@ def main(arguments) -> None: outdir = Path(arguments.outdir) outdir.mkdir(parents=True, exist_ok=True) - primir_out = outdir/( - f"extended_primir_annotation_{arguments.extension}_nt.gff3" - ) - mir_out = outdir/f"extended_mirna_annotation_{arguments.extension}_nt.gff3" + primir_out = outdir / ( + f"extended_primir_annotation_{arguments.extension}_nt.gff3" + ) + mir_out = ( + outdir / f"extended_mirna_annotation_{arguments.extension}_nt.gff3" + ) with open(arguments.input, encoding="utf-8") as in_file: if len(in_file.read()) == 0: - with (open(primir_out, 'w', encoding="utf-8") as primir, - open(mir_out, 'w', encoding="utf-8") as mir): + with ( + open(primir_out, "w", encoding="utf-8") as primir, + open(mir_out, "w", encoding="utf-8") as mir, + ): primir.write("") mir.write("") return @@ -212,13 +226,14 @@ def main(arguments) -> None: m = MirnaExtension() m.load_gff_file(arguments.input) - m.extend_mirnas(n=arguments.extension, - seq_lengths=seq_lengths, - primir_out=primir_out, - mir_out=mir_out) + m.extend_mirnas( + n=arguments.extension, + seq_lengths=seq_lengths, + primir_out=primir_out, + mir_out=mir_out, + ) if __name__ == "__main__": - args = parse_arguments().parse_args() # pragma: no cover main(args) # pragma: no cover diff --git a/scripts/mirna_quantification.py b/scripts/mirna_quantification.py index e1d6ac48..9f4ed88a 100755 --- a/scripts/mirna_quantification.py +++ b/scripts/mirna_quantification.py @@ -128,39 +128,40 @@ def parse_arguments(): """Command-line arguments parser.""" parser = argparse.ArgumentParser( description=__doc__, - formatter_class=argparse.RawDescriptionHelpFormatter + formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( - '-v', '--version', - action='version', - version='%(prog)s 1.0.0', - help="Show program's version number and exit." + "-v", + "--version", + action="version", + version="%(prog)s 1.0.0", + help="Show program's version number and exit.", ) parser.add_argument( - 'samfile', + "samfile", help=( "Path to the SAM file containing the intersecting miRNA name(s)." ), - type=Path + type=Path, ) parser.add_argument( - '--outdir', + "--outdir", help="Path to the output directory. Default: %(default)s.", default=Path.cwd(), - type=Path + type=Path, ) parser.add_argument( - '--mir-list', + "--mir-list", help=( "List of miRNA types to have in the output table." " Default: %(default)s." ), - nargs='*', - default=['isomir', 'mirna'], - type=str + nargs="*", + default=["isomir", "mirna"], + type=str, ) parser.add_argument( - '--lib', + "--lib", help=( "Library to which the alignments belong to. Default: %(default)s." ), @@ -168,16 +169,18 @@ def parse_arguments(): default="lib", ) parser.add_argument( - '-t', '--tag', + "-t", + "--tag", help=( "Indicate the tag storing the intersecting miRNA name." " Default: %(default)s." ), - default='YW', - type=str + default="YW", + type=str, ) parser.add_argument( - '-c', '--collapsed', + "-c", + "--collapsed", help=( "Indicate that the SAM file has the reads collapsed by sequence." "In that case, the SAM query names are expected to follow the " @@ -188,11 +191,11 @@ def parse_arguments(): "FASTX-Toolkit: http://hannonlab.cshl.edu/fastx_toolkit/" " Default: %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) parser.add_argument( - '--nh', + "--nh", help=( "Indicate that the SAM file has the NH value at the end of the" " read query name. In that case, SAM query names are expected to" @@ -200,35 +203,35 @@ def parse_arguments(): " name and NH is the NH value, i.e my_query_name_4. Default:" " %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) parser.add_argument( - '--count', + "--count", help=( "If set, the amount of best alignments for each miRNA is included" " in the output table. Default: %(default)s" ), - action='store_true', - default=False + action="store_true", + default=False, ) parser.add_argument( - '--len', + "--len", help=( "If set, the miRNA length is included in the output table." " Default: %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) parser.add_argument( - '--read-ids', + "--read-ids", help=( "If set, the read IDs that belong to each miRNA are included in " "the output table separated by a semi-colon. Default: %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) return parser @@ -251,16 +254,18 @@ def collapsed_nh_contribution(aln: pysam.AlignedSegment) -> float: """ name = str(aln.query_name) try: - if (val := re.search(r'\d+_\d+$', name)): - values = val.group().split('_') + if val := re.search(r"\d+_\d+$", name): + values = val.group().split("_") return float(values[0]) / float(values[1]) except AttributeError: - sys.stdout.write(f"Invalid query name: '{aln.query_name}'.\n" + - "Cannot calculate contribution.\n" + - "Check the SAM file validity and CLI options" + - " --collapsed and --nh.\n") + sys.stdout.write( + f"Invalid query name: '{aln.query_name}'.\n" + + "Cannot calculate contribution.\n" + + "Check the SAM file validity and CLI options" + + " --collapsed and --nh.\n" + ) raise @@ -281,20 +286,22 @@ def collapsed_contribution(aln: pysam.AlignedSegment) -> float: """ name = str(aln.query_name) try: - if (coll := re.search(r'\d+$', name)): + if coll := re.search(r"\d+$", name): collapsed = float(coll.group()) except AttributeError: - sys.stdout.write(f"Invalid query name: '{aln.query_name}'.\n" + - "Option --collapsed specified but query name does" + - " not include the number of collapsed sequences.\n" + - "Check SAM file consistency and CLI options" + - " --collapsed and --nh.\n") + sys.stdout.write( + f"Invalid query name: '{aln.query_name}'.\n" + + "Option --collapsed specified but query name does" + + " not include the number of collapsed sequences.\n" + + "Check SAM file consistency and CLI options" + + " --collapsed and --nh.\n" + ) raise try: nh_value = float(aln.get_tag("NH")) - return collapsed/nh_value + return collapsed / nh_value except KeyError: return collapsed @@ -317,17 +324,19 @@ def nh_contribution(aln: pysam.AlignedSegment) -> float: """ name = str(aln.query_name) try: - if (cont := re.search(r'\d+$', name)): + if cont := re.search(r"\d+$", name): nh_val = float(cont.group()) - return 1/nh_val + return 1 / nh_val except AttributeError: - sys.stdout.write(f"Invalid query name: '{aln.query_name}'.\n" + - "Option --nh specified but query name does" + - " not include NH.\n" + - "Check SAM file consistency and CLI options" + - " --collapsed and --nh.\n") + sys.stdout.write( + f"Invalid query name: '{aln.query_name}'.\n" + + "Option --nh specified but query name does" + + " not include NH.\n" + + "Check SAM file consistency and CLI options" + + " --collapsed and --nh.\n" + ) raise @@ -346,7 +355,7 @@ def contribution(aln: pysam.AlignedSegment) -> float: the conrtibution of the alignment to the overall count """ try: - return 1/float(aln.get_tag("NH")) + return 1 / float(aln.get_tag("NH")) except KeyError: return 1.0 @@ -368,44 +377,42 @@ def get_name(pre_name: str) -> list[str]: list with the species name to be found in the final table and its type """ data_name = pre_name.split("|") - cigar = re.sub(r'[^0-9]', '', data_name[3]) - md = re.sub(r'[^0-9]', '', data_name[4]) + cigar = re.sub(r"[^0-9]", "", data_name[3]) + md = re.sub(r"[^0-9]", "", data_name[4]) - if data_name[1] == '0' and data_name[2] == '0' and cigar == md: - return ['mirna', data_name[0]] + if data_name[1] == "0" and data_name[2] == "0" and cigar == md: + return ["mirna", data_name[0]] - return ['isomir', pre_name] + return ["isomir", pre_name] def write_output( - name: str, - species: list[str], - mir_list: list[str], - mirna_out: Path) -> None: + name: str, species: list[str], mir_list: list[str], mirna_out: Path +) -> None: """Write to the output the correct miRNA type.""" - with open(mirna_out, 'a', encoding="utf-8") as mirna: + with open(mirna_out, "a", encoding="utf-8") as mirna: if name in mir_list: - mirna.write('\t'.join(species) + '\n') + mirna.write("\t".join(species) + "\n") else: - mirna.write('') + mirna.write("") def main(arguments) -> None: """Quantify miRNAs and corresponding isomiRs.""" outdir = Path(arguments.outdir) outdir.mkdir(parents=True, exist_ok=True) - outfile = outdir/f'mirna_counts_{arguments.lib}' + outfile = outdir / f"mirna_counts_{arguments.lib}" contribution_type = { - (True, True): collapsed_nh_contribution, - (True, False): collapsed_contribution, - (False, True): nh_contribution, - (False, False): contribution} + (True, True): collapsed_nh_contribution, + (True, False): collapsed_contribution, + (False, True): nh_contribution, + (False, False): contribution, + } get_contribution = contribution_type[arguments.collapsed, arguments.nh] - with pysam.AlignmentFile(arguments.samfile, 'r') as samfile: - + with pysam.AlignmentFile(arguments.samfile, "r") as samfile: try: alignment = next(samfile) current_species = alignment.get_tag(arguments.tag) @@ -415,16 +422,17 @@ def main(arguments) -> None: alns_count = 1 except StopIteration: - write_output(name="", - species=[], - mir_list=arguments.mir_list, - mirna_out=outfile) + write_output( + name="", + species=[], + mir_list=arguments.mir_list, + mirna_out=outfile, + ) return for alignment in samfile: - - if current_species == '': + if current_species == "": current_species = alignment.get_tag(arguments.tag) count = get_contribution(alignment) alns_count = 1 @@ -446,12 +454,14 @@ def main(arguments) -> None: if arguments.len: species.append(str(alignment.query_alignment_length)) if arguments.read_ids: - species.append(';'.join(read_ID)) + species.append(";".join(read_ID)) - write_output(name=name[0], - species=species, - mir_list=arguments.mir_list, - mirna_out=outfile) + write_output( + name=name[0], + species=species, + mir_list=arguments.mir_list, + mirna_out=outfile, + ) current_species = alignment.get_tag(arguments.tag) count = get_contribution(alignment) @@ -466,15 +476,16 @@ def main(arguments) -> None: if arguments.len: species.append(str(alignment.query_alignment_length)) if arguments.read_ids: - species.append(';'.join(read_ID)) + species.append(";".join(read_ID)) - write_output(name=name[0], - species=species, - mir_list=arguments.mir_list, - mirna_out=outfile) + write_output( + name=name[0], + species=species, + mir_list=arguments.mir_list, + mirna_out=outfile, + ) if __name__ == "__main__": - args = parse_arguments().parse_args() # pragma: no cover main(args) # pragma: no cover diff --git a/scripts/nh_filter.py b/scripts/nh_filter.py index 66cee492..d55f937a 100755 --- a/scripts/nh_filter.py +++ b/scripts/nh_filter.py @@ -12,10 +12,12 @@ import sys import pysam -if sys.argv[1] in ['--help', '-h', '-help']: - sys.exit("\nDescription: Checks for NH tag to remove reads that aligned " - "more than max_NH value.\nUsage: filter_nh.py [SAM file] [max_NH]" - "[OUTPUT file]\n") +if sys.argv[1] in ["--help", "-h", "-help"]: + sys.exit( + "\nDescription: Checks for NH tag to remove reads that aligned " + "more than max_NH value.\nUsage: filter_nh.py [SAM file] [max_NH]" + "[OUTPUT file]\n" + ) elif len(sys.argv) < 4 or len(sys.argv) > 4: sys.exit("\n Arguments ERROR. See [nh_filter.py --help]\n") @@ -23,8 +25,8 @@ def main(): """Filter alignments by NH tag.""" sys.stdout.write( - f"Removing reads aligned more than {sys.argv[2]} times... \n" - ) + f"Removing reads aligned more than {sys.argv[2]} times... \n" + ) infile = pysam.Samfile(sys.argv[1], "r", check_sq=False) out = pysam.Samfile(sys.argv[3], "w", template=infile) @@ -35,7 +37,7 @@ def main(): intags = DNAread.tags for entry in intags: - if 'NH' in entry and entry[1] > int(sys.argv[2]): + if "NH" in entry and entry[1] > int(sys.argv[2]): keep = False if keep: out.write(DNAread) @@ -46,5 +48,5 @@ def main(): sys.stdout.write("DONE!\n") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/scripts/oligomap_output_to_sam_nh_filtered.py b/scripts/oligomap_output_to_sam_nh_filtered.py index c0df9cab..dcb60a90 100755 --- a/scripts/oligomap_output_to_sam_nh_filtered.py +++ b/scripts/oligomap_output_to_sam_nh_filtered.py @@ -97,22 +97,23 @@ class Fields(NamedTuple): def parse_arguments(): """Command-line arguments parser.""" parser = ArgumentParser( - description=__doc__, - formatter_class=RawDescriptionHelpFormatter + description=__doc__, formatter_class=RawDescriptionHelpFormatter ) parser.add_argument( - '-v', '--version', + "-v", + "--version", action="version", version="%(prog)s 1.1.0", - help="Show program's version number and exit" + help="Show program's version number and exit", ) parser.add_argument( - 'infile', + "infile", help="Path to the FASTA file resulting from the oligomap mapping.", type=Path, ) parser.add_argument( - '-n', '--nh-filter', + "-n", + "--nh-filter", help=( "Add NH tag to output and remove reads that contain more " "aligments than the provided NH value (with min error)." @@ -123,8 +124,9 @@ def parse_arguments(): return parser -def get_cigar_md(errors: str, sequence: str, bars_line: str, - ref_seq: str) -> tuple[str, str]: +def get_cigar_md( + errors: str, sequence: str, bars_line: str, ref_seq: str +) -> tuple[str, str]: """Get the CIGAR and MD strings. Given the read and target sequences, the number of errors and the @@ -193,14 +195,14 @@ def get_cigar_md(errors: str, sequence: str, bars_line: str, """ seq_len = len(sequence) - if errors == '0': + if errors == "0": return f"{seq_len}M", f"MD:Z:{seq_len}" # CASE 1: deletion in the read - if '-' in sequence: + if "-" in sequence: indelerr = "1D" - if bars_line[0] == ' ': + if bars_line[0] == " ": cigarStr = f"{indelerr}{seq_len - 1}M" matchingString = f"MD:Z:^{ref_seq[0]}{seq_len - 1}" @@ -209,14 +211,14 @@ def get_cigar_md(errors: str, sequence: str, bars_line: str, matchingString = f"MD:Z:{seq_len - 1}^{ref_seq[seq_len - 1]}0" else: - idx = bars_line.index(' ') + idx = bars_line.index(" ") cigarStr = f"{idx}M{indelerr}{bars_line.count('|') - idx}M" matchingString = f"MD:Z:{idx}^{ref_seq[idx]}{seq_len - idx -1}" return cigarStr, matchingString # CASE 2: insertion in the read - if '-' in ref_seq: + if "-" in ref_seq: indelerr = "1I" if bars_line[0] == " ": @@ -226,7 +228,7 @@ def get_cigar_md(errors: str, sequence: str, bars_line: str, cigarStr = f"{seq_len - 1}M{indelerr}" else: - idx = bars_line.index(' ') + idx = bars_line.index(" ") cigarStr = f"{idx}M{indelerr}{bars_line.count('|') - idx}M" return cigarStr, f"MD:Z:{seq_len}" @@ -239,7 +241,7 @@ def get_cigar_md(errors: str, sequence: str, bars_line: str, matchingString = f"MD:Z:{seq_len - 1}{ref_seq[seq_len - 1]}" else: - idx = bars_line.index(' ') + idx = bars_line.index(" ") matchingString = f"MD:Z:{idx}{ref_seq[idx]}{seq_len - idx - 1}" return f"{seq_len}M", matchingString @@ -285,19 +287,31 @@ def get_sam_fields(aln: list[str]) -> Fields: cigar, md = get_cigar_md(errors, seq, aln[4][:-1], aln[5].strip()) - fields = Fields(seq_name_pos[0], - '0' if aln[2].split()[3] == '+' else "16", - aln[1].strip(), - seq_name_pos[5].split('.')[0], - "255", cigar, '*', '0', '0', - re.sub('-', '', seq), '*', - "NM:i:0" if errors == '0' else "NM:i:1", md) + fields = Fields( + seq_name_pos[0], + "0" if aln[2].split()[3] == "+" else "16", + aln[1].strip(), + seq_name_pos[5].split(".")[0], + "255", + cigar, + "*", + "0", + "0", + re.sub("-", "", seq), + "*", + "NM:i:0" if errors == "0" else "NM:i:1", + md, + ) return fields -def eval_aln(nhfilter: int, d: Dict[str, list], min_err_nh: Dict[str, list], - fields: Fields) -> None: +def eval_aln( + nhfilter: int, + d: Dict[str, list], + min_err_nh: Dict[str, list], + fields: Fields, +) -> None: """Evaluate an alignment to add, discard or write it to the STDOUT. Given a read's alignment, this function first checks if the dictionary @@ -339,9 +353,10 @@ def eval_aln(nhfilter: int, d: Dict[str, list], min_err_nh: Dict[str, list], errors = fields.edit_dist[-1] if len(d) == 0: - if (seq_name not in list(min_err_nh.keys()) or - errors < min_err_nh[seq_name][0]): - + if ( + seq_name not in list(min_err_nh.keys()) + or errors < min_err_nh[seq_name][0] + ): min_err_nh[seq_name] = [errors, 1] d[seq_name] = [fields] else: @@ -350,33 +365,43 @@ def eval_aln(nhfilter: int, d: Dict[str, list], min_err_nh: Dict[str, list], min_err_nh[seq_name][1] += 1 if nhfilter: - if min_err_nh[seq_name][1] <= nhfilter: d[seq_name].append(fields) else: d.clear() - sys.stderr.write(f"Filtered by NH | Read {seq_name}" + - f" | Errors = {errors}\n") + sys.stderr.write( + f"Filtered by NH | Read {seq_name}" + + f" | Errors = {errors}\n" + ) else: d[seq_name].append(fields) elif errors < min_err_nh[seq_name][0]: - sys.stderr.write(f"Filtered by ERROR | Read {seq_name}" + - f" | Errors = {min_err_nh[seq_name][0]}\n") - - min_err_nh[seq_name] = [min(errors, min_err_nh[seq_name][0]), - 1] + sys.stderr.write( + f"Filtered by ERROR | Read {seq_name}" + + f" | Errors = {min_err_nh[seq_name][0]}\n" + ) + + min_err_nh[seq_name] = [ + min(errors, min_err_nh[seq_name][0]), + 1, + ] d[seq_name] = [fields] else: for seq, aln in d.items(): - sys.stderr.write(f"Written read {seq} | " + - f"Errors = {min_err_nh[seq][0]} | " + - f"NH = {min_err_nh[seq][1]}\n") + sys.stderr.write( + f"Written read {seq} | " + + f"Errors = {min_err_nh[seq][0]} | " + + f"NH = {min_err_nh[seq][1]}\n" + ) for field in aln: - sys.stdout.write('\t'.join(field) + - f"\tNH:i:{min_err_nh[seq][1]}" + '\n') + sys.stdout.write( + "\t".join(field) + + f"\tNH:i:{min_err_nh[seq][1]}" + + "\n" + ) d.clear() min_err_nh.clear() @@ -390,37 +415,37 @@ def main(arguments) -> None: read_seqs: Dict[str, list] = {} seq_min_error_nh: Dict[str, list] = {} - with open(arguments.infile, 'r', encoding="utf-8") as in_file: - + with open(arguments.infile, "r", encoding="utf-8") as in_file: sys.stderr.write("##############\nSTART READING...\n##############\n") lines = [in_file.readline() for _ in range(6)] i = 1 while lines[0] != "": - fields = get_sam_fields(lines) sys.stderr.write(f"Record:{i} | Sequence:{fields.read_name}\n") - eval_aln(arguments.nh_filter, read_seqs, seq_min_error_nh, - fields) + eval_aln(arguments.nh_filter, read_seqs, seq_min_error_nh, fields) i += 1 in_file.readline() lines = [in_file.readline() for _ in range(6)] if len(read_seqs) > 0: - for read_name, alignments in read_seqs.items(): - sys.stderr.write(f"Printed read {read_name} | Errors = " + - f"{seq_min_error_nh[read_name][0]} | " + - f"NH = {seq_min_error_nh[read_name][1]}\n") + sys.stderr.write( + f"Printed read {read_name} | Errors = " + + f"{seq_min_error_nh[read_name][0]} | " + + f"NH = {seq_min_error_nh[read_name][1]}\n" + ) for aln in alignments: - sys.stdout.write('\t'.join(aln) + - f"\tNH:i:{seq_min_error_nh[read_name][1]}" + - '\n') + sys.stdout.write( + "\t".join(aln) + + f"\tNH:i:{seq_min_error_nh[read_name][1]}" + + "\n" + ) sys.stderr.write("SUCCESSFULLY FINISHED.") diff --git a/scripts/primir_quantification.py b/scripts/primir_quantification.py index d4a48c38..2bae1162 100755 --- a/scripts/primir_quantification.py +++ b/scripts/primir_quantification.py @@ -30,74 +30,73 @@ def parse_arguments(): """Command-line arguments parser.""" - parser = argparse.ArgumentParser( - description=__doc__ - ) + parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( - '-v', '--version', - action='version', - version='%(prog)s 1.0', - help="Show program's version number and exit" + "-v", + "--version", + action="version", + version="%(prog)s 1.0", + help="Show program's version number and exit", ) parser.add_argument( - 'bedfile', + "bedfile", help=( "Path to the BED file. This file must be the output of " "a bedtools intersect call with -a being a BED file and" "-b a BAM file." ), - type=Path + type=Path, ) parser.add_argument( - '--collapsed', + "--collapsed", help=( "Indicate that the file used in bedtools intersect has the" "reads collapsed by sequence and alignment. The collapsed name" "must be build by the alignment name followed by a '-' and the" "number of collpased alignments, i.e 1-4. Default %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) parser.add_argument( - '--nh', + "--nh", help=( "Indicate that the file used in bedtools intersect has the" "NH tag in the read query name. The name must be build by the" "alignment name followed by an underscore and the NH value," "i.e 1-2_4. Default %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) parser.add_argument( - '--id', + "--id", help=( "ID used to identify the feature in the output table." "The ID must be in lowercase. Default: %(default)s." ), default="name", - type=str + type=str, ) parser.add_argument( - '--read-ids', + "--read-ids", help=( "Include read IDs of the alignments intersecting a feature in" "the output table. Default: %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) parser.add_argument( - '--feat-extension', + "--feat-extension", help=( "If any of the feature's coordinates had been extended, include" "the extension in the output table. It is assumed that the" "extensions are found within the feature id 'name' and separated" "by an underscore. Default: %(default)s." ), - action='store_true', - default=False + action="store_true", + default=False, ) return parser @@ -105,40 +104,39 @@ def parse_arguments(): def attributes_dictionary(attr: str) -> Dict[str, str]: """Create attributes dicctionary.""" - pairs = attr.split(';') + pairs = attr.split(";") - if len(pairs[0].split('=')) == 2: - attr_dict = {p.split('=')[0].lower(): p.split('=')[1] for p in pairs} + if len(pairs[0].split("=")) == 2: + attr_dict = {p.split("=")[0].lower(): p.split("=")[1] for p in pairs} else: attr_dict = { - p.split('"')[0].strip().lower(): p.split('"')[1] - for p in pairs - } + p.split('"')[0].strip().lower(): p.split('"')[1] for p in pairs + } return attr_dict -def get_contribution(query_id: str, - collapsed: bool = False, - nh: bool = False) -> float: +def get_contribution( + query_id: str, collapsed: bool = False, nh: bool = False +) -> float: """Get contribution of an alignment to the overall count.""" if collapsed and nh: - num_reads = int(query_id.split('-')[1].split('_')[0]) - nh_value = int(query_id.split('-')[1].split('_')[1]) + num_reads = int(query_id.split("-")[1].split("_")[0]) + nh_value = int(query_id.split("-")[1].split("_")[1]) elif not collapsed and nh: num_reads = 1 - nh_value = int(query_id.split('_')[1]) + nh_value = int(query_id.split("_")[1]) elif collapsed and not nh: - num_reads = int(query_id.split('-')[1]) + num_reads = int(query_id.split("-")[1]) nh_value = 1 else: num_reads = 1 nh_value = 1 - return num_reads/nh_value + return num_reads / nh_value def get_initial_data(name: str, feat_extension: bool) -> list[str]: @@ -157,10 +155,10 @@ def get_initial_data(name: str, feat_extension: bool) -> list[str]: number of extended positions (if asked for) """ if feat_extension: - feat_data = name.split('_') + feat_data = name.split("_") if len(feat_data) == 1: - feat_data.extend(['NA', 'NA']) + feat_data.extend(["NA", "NA"]) else: feat_data = [name] @@ -169,26 +167,38 @@ def get_initial_data(name: str, feat_extension: bool) -> list[str]: def main(arguments) -> None: """Tabulate a bedtools intersect BED file.""" - with open(arguments.bedfile, 'r', encoding="utf-8") as bedfile: - - Fields = namedtuple('Fields', - ("feat_chr", "source", "feat_type", - "feat_start", "feat_end", "feat_score", - "strand", "phase", "feat_attributes", - "read_chr", "read_start", "read_end", - "read_name", "read_score", "read_strand")) + with open(arguments.bedfile, "r", encoding="utf-8") as bedfile: + Fields = namedtuple( + "Fields", + ( + "feat_chr", + "source", + "feat_type", + "feat_start", + "feat_end", + "feat_score", + "strand", + "phase", + "feat_attributes", + "read_chr", + "read_start", + "read_end", + "read_name", + "read_score", + "read_strand", + ), + ) count = 0.0 current_name = None read_ID = [] for line in bedfile: - - fields = Fields(*line.strip().split('\t')) + fields = Fields(*line.strip().split("\t")) name = attributes_dictionary(fields.feat_attributes)[arguments.id] - contribution = get_contribution(fields.read_name, - arguments.collapsed, - arguments.nh) + contribution = get_contribution( + fields.read_name, arguments.collapsed, arguments.nh + ) if current_name is None: current_name = name @@ -202,9 +212,9 @@ def main(arguments) -> None: feat_data.insert(1, str(count)) if arguments.read_ids: - feat_data.append(';'.join(sorted(read_ID))) + feat_data.append(";".join(sorted(read_ID))) - sys.stdout.write('\t'.join(feat_data) + '\n') + sys.stdout.write("\t".join(feat_data) + "\n") feat_data = get_initial_data(name, arguments.feat_extension) @@ -216,12 +226,11 @@ def main(arguments) -> None: feat_data.insert(1, str(count)) if arguments.read_ids: - feat_data.append(';'.join(sorted(read_ID))) + feat_data.append(";".join(sorted(read_ID))) - sys.stdout.write('\t'.join(feat_data) + '\n') + sys.stdout.write("\t".join(feat_data) + "\n") if __name__ == "__main__": - args = parse_arguments().parse_args() # pragma: no cover main(args) # pragma: no cover diff --git a/scripts/tests/__init__.py b/scripts/tests/__init__.py new file mode 100644 index 00000000..2370e19e --- /dev/null +++ b/scripts/tests/__init__.py @@ -0,0 +1 @@ +"""Test package for MIRFLOWZ.""" diff --git a/scripts/tests/test_filter_multimappers.py b/scripts/tests/test_filter_multimappers.py index 8f4d9d82..3dc96ac1 100755 --- a/scripts/tests/test_filter_multimappers.py +++ b/scripts/tests/test_filter_multimappers.py @@ -7,14 +7,12 @@ import pysam import pytest -sys.path.append("../../") - -from scripts.filter_multimappers import ( +from ..filter_multimappers import ( count_indels, find_best_alignments, main, parse_arguments, - write_output + write_output, ) @@ -22,7 +20,7 @@ def sam_empty_file(): """Import path to empty test file.""" empty_file = Path("files/header_only.sam") - + return empty_file @@ -42,6 +40,7 @@ def sam_no_multimappers_file(): return no_multi + @pytest.fixture def sam_unique_diff_multimappers_files(): """Import path to test files with a single multimapper.""" @@ -50,6 +49,7 @@ def sam_unique_diff_multimappers_files(): return in_diff_multi, out_diff_multi + @pytest.fixture def sam_unique_equal_multimapper_files(): """Import path to the test file with a single multimapper.""" @@ -58,21 +58,19 @@ def sam_unique_equal_multimapper_files(): return in_sam, out_sam + @pytest.fixture def sam_sec_sup_files(): - """ - Import path to the test files with secondary and supplementary alignments. - """ + """Import path to the test files with secondary and supp. alignments.""" in_sam = Path("files/in_sec_sup.sam") out_sam = Path("files/sec_sup.sam") return in_sam, out_sam + @pytest.fixture def sam_multimappers_nh_files(): - """ - Import path to test files with multimappers and the NH tag in the query name. - """ + """Import path to test files with multimappers and NH tag in the name.""" in_multimappers = Path("files/in_multimappers.sam") out_multimappers = Path("files/multimappers_nh.sam") @@ -145,10 +143,7 @@ class TestParseArguments: def test_no_input(self, monkeypatch): """Call without input file.""" with pytest.raises(SystemExit) as sysex: - monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers'] - ) + monkeypatch.setattr(sys, "argv", ["filter_multimappers"]) parse_arguments().parse_args() assert sysex.value.code == 2 @@ -156,23 +151,27 @@ def test_correct_input(self, monkeypatch, sam_no_multimappers_file): """Call with a single input file.""" sam_1 = sam_no_multimappers_file monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers', - str(sam_1), - ] + sys, + "argv", + [ + "filter_multimappers", + str(sam_1), + ], ) args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) - + def test_all_input_options(self, monkeypatch, sam_no_multimappers_file): """Call with a single input file and the --nh option.""" sam_1 = sam_no_multimappers_file monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers', - str(sam_1), - '--nh', - ] + sys, + "argv", + [ + "filter_multimappers", + str(sam_1), + "--nh", + ], ) args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) @@ -181,10 +180,13 @@ def test_too_many_inputs(self, monkeypatch, sam_multimappers_files): """Call with too many input files.""" sam_1, sam_2 = sam_multimappers_files monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers', - str(sam_1), str(sam_2), - ] + sys, + "argv", + [ + "filter_multimappers", + str(sam_1), + str(sam_2), + ], ) with pytest.raises(SystemExit) as sysex: parse_arguments().parse_args() @@ -236,7 +238,7 @@ def test_find_best_alignments_equal_multimappers(self, alns): assert output[1].get_tag("NH") == 2 assert output[0].get_tag("HI") == 1 assert output[1].get_tag("HI") == 2 - + def test_find_best_alignments_multimappers_nh(self, alns): """Test function with multimappers with different indel count.""" output = find_best_alignments([alns[0], alns[1]], True) @@ -272,16 +274,16 @@ def test_write_output_one_alignment(self, capsys, sam_multimappers_files): """Test funciton with a single alignment.""" in_sam, out_sam = sam_multimappers_files - with pysam.AlignmentFile(in_sam, 'r') as in_file: + with pysam.AlignmentFile(in_sam, "r") as in_file: alignment = next(in_file) - + write_output([alignment]) captured = capsys.readouterr() - with pysam.AlignmentFile(out_sam, 'r') as out_file: + with pysam.AlignmentFile(out_sam, "r") as out_file: out_alignment = next(out_file) - - assert captured.out == out_alignment.to_string() + '\n' + + assert captured.out == out_alignment.to_string() + "\n" class TestMain: @@ -292,83 +294,101 @@ def test_main_empty_file(self, capsys, monkeypatch, sam_empty_file): empty_file = sam_empty_file monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers', - str(empty_file), - ] + sys, + "argv", + [ + "filter_multimappers", + str(empty_file), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(empty_file, 'r') as out_file: + with open(empty_file, "r") as out_file: assert captured.out == out_file.read() - def test_main_multimappers(self, capsys, monkeypatch, sam_multimappers_files): + def test_main_multimappers( + self, capsys, monkeypatch, sam_multimappers_files + ): """Test main function with multimappers.""" in_sam, out_sam = sam_multimappers_files monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers', - str(in_sam), - ] + sys, + "argv", + [ + "filter_multimappers", + str(in_sam), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(out_sam, 'r') as out_file: + with open(out_sam, "r") as out_file: assert captured.out == out_file.read() - def test_main_multimappers_nh(self, capsys, monkeypatch, sam_multimappers_nh_files): + def test_main_multimappers_nh( + self, capsys, monkeypatch, sam_multimappers_nh_files + ): """Test main function with multimappers with nh argument.""" in_sam, out_sam = sam_multimappers_nh_files monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers', - str(in_sam), - '--nh', - ] + sys, + "argv", + [ + "filter_multimappers", + str(in_sam), + "--nh", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(out_sam, 'r') as out_file: + with open(out_sam, "r") as out_file: assert captured.out == out_file.read() - def test_main_no_multimappers(self, capsys, monkeypatch, sam_no_multimappers_file): + def test_main_no_multimappers( + self, capsys, monkeypatch, sam_no_multimappers_file + ): """Test main function with no multimappers.""" sam_file = sam_no_multimappers_file monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers', - str(sam_file), - ] + sys, + "argv", + [ + "filter_multimappers", + str(sam_file), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(sam_file, 'r') as out_file: + with open(sam_file, "r") as out_file: assert captured.out == out_file.read() - - def test_main_secondary_supplementary(self, capsys, monkeypatch, sam_sec_sup_files): + + def test_main_secondary_supplementary( + self, capsys, monkeypatch, sam_sec_sup_files + ): """Test main function with secondary and supplementary alignments.""" in_sam, out_sam = sam_sec_sup_files monkeypatch.setattr( - sys, 'argv', - ['filter_multimappers', - str(in_sam), - ] + sys, + "argv", + [ + "filter_multimappers", + str(in_sam), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(out_sam, 'r') as out_file: + with open(out_sam, "r") as out_file: assert captured.out == out_file.read() diff --git a/scripts/tests/test_iso_name_tagging.py b/scripts/tests/test_iso_name_tagging.py index 0f322b24..ef1881c7 100644 --- a/scripts/tests/test_iso_name_tagging.py +++ b/scripts/tests/test_iso_name_tagging.py @@ -6,12 +6,7 @@ import pytest -sys.path.append("../../") - -from scripts.iso_name_tagging import ( - main, - parse_arguments -) +from ..iso_name_tagging import main, parse_arguments @pytest.fixture @@ -62,24 +57,30 @@ def test_no_bed(self, monkeypatch, bed_sam): with pytest.raises(SystemExit) as sysex: monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--sam', str(in_sam), - ] + sys, + "argv", + [ + "iso_name_tagging", + "--sam", + str(in_sam), + ], ) parse_arguments().parse_args() assert sysex.value.code == 2 - + def test_no_sam(self, monkeypatch, bed_sam): """Call without bed file.""" in_bed, in_sam, output = bed_sam with pytest.raises(SystemExit) as sysex: monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--bed', str(in_bed), - ] + sys, + "argv", + [ + "iso_name_tagging", + "--bed", + str(in_bed), + ], ) parse_arguments().parse_args() assert sysex.value.code == 2 @@ -89,27 +90,37 @@ def test_correct_input(self, monkeypatch, bed_sam): in_bed, in_sam, output = bed_sam monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--bed', str(in_bed), - '--sam', str(in_sam), - ] + sys, + "argv", + [ + "iso_name_tagging", + "--bed", + str(in_bed), + "--sam", + str(in_sam), + ], ) args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) - + def test_all_input(self, monkeypatch, bed_sam): """Call with all the options.""" in_bed, in_sam, output = bed_sam monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--bed', str(in_bed), - '--sam', str(in_sam), - '--id', "alias", - '--extension', '6', - ] + sys, + "argv", + [ + "iso_name_tagging", + "--bed", + str(in_bed), + "--sam", + str(in_sam), + "--id", + "alias", + "--extension", + "6", + ], ) args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) @@ -118,96 +129,123 @@ def test_all_input(self, monkeypatch, bed_sam): class TestMain: """Test 'main()' function.""" - def test_main_empty_bed_file(self, monkeypatch, capsys, empty_files, bed_sam): + def test_main_empty_bed_file( + self, monkeypatch, capsys, empty_files, bed_sam + ): """Test main function with an empty bed file.""" empty_bed, empty_sam = empty_files monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--bed', str(empty_bed), - '--sam', str(empty_sam), - ] + sys, + "argv", + [ + "iso_name_tagging", + "--bed", + str(empty_bed), + "--sam", + str(empty_sam), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(empty_sam, 'r') as out_file: + with open(empty_sam, "r") as out_file: assert captured.out == out_file.read() - def test_main_empty_sam_file(self, monkeypatch, capsys, empty_files, bed_sam): + def test_main_empty_sam_file( + self, monkeypatch, capsys, empty_files, bed_sam + ): """Test main function with an empty sam file.""" empty_bed, empty_sam = empty_files in_bed, in_sam, output = bed_sam monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--bed', str(in_bed), - '--sam', str(empty_sam), - ] + sys, + "argv", + [ + "iso_name_tagging", + "--bed", + str(in_bed), + "--sam", + str(empty_sam), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(empty_sam, 'r') as out_file: + with open(empty_sam, "r") as out_file: assert captured.out == out_file.read() - def test_main_bed_sam_file(self, monkeypatch, capsys, bed_sam): """Test main function without options.""" in_bed, in_sam, output = bed_sam monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--bed', str(in_bed), - '--sam', str(in_sam), - ] + sys, + "argv", + [ + "iso_name_tagging", + "--bed", + str(in_bed), + "--sam", + str(in_sam), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(output, 'r') as out_file: + with open(output, "r") as out_file: assert captured.out == out_file.read() - - def test_main_bed_sam_extension_file(self, monkeypatch, capsys, bed_sam_extension): + + def test_main_bed_sam_extension_file( + self, monkeypatch, capsys, bed_sam_extension + ): """Test main function with extension equals 6.""" in_bed, in_sam, output = bed_sam_extension monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--bed', str(in_bed), - '--sam', str(in_sam), - '--extension', '6', - ] + sys, + "argv", + [ + "iso_name_tagging", + "--bed", + str(in_bed), + "--sam", + str(in_sam), + "--extension", + "6", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(output, 'r') as out_file: + with open(output, "r") as out_file: assert captured.out == out_file.read() - def test_main_bed_sam_file(self, monkeypatch, capsys, bed_sam_id): + def test_main_bed_sam_file_id(self, monkeypatch, capsys, bed_sam_id): """Test main function with id equals id.""" in_bed, in_sam, output = bed_sam_id monkeypatch.setattr( - sys, 'argv', - ['iso_name_tagging', - '--bed', str(in_bed), - '--sam', str(in_sam), - '--id', 'id' - ] + sys, + "argv", + [ + "iso_name_tagging", + "--bed", + str(in_bed), + "--sam", + str(in_sam), + "--id", + "id", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(output, 'r') as out_file: - assert captured.out == out_file.read() \ No newline at end of file + with open(output, "r") as out_file: + assert captured.out == out_file.read() diff --git a/scripts/tests/test_mirna_extension.py b/scripts/tests/test_mirna_extension.py index 6640ea3d..16f83099 100644 --- a/scripts/tests/test_mirna_extension.py +++ b/scripts/tests/test_mirna_extension.py @@ -7,14 +7,7 @@ import gffutils import pytest -sys.path.append("../../") - - -from scripts.mirna_extension import( - main, - MirnaExtension, - parse_arguments -) +from ..mirna_extension import main, MirnaExtension, parse_arguments @pytest.fixture @@ -41,7 +34,7 @@ def gff_extremes(): in_extremes = Path("files/in_mirna_extreme_mirs.gff3") out_primir = Path("files/extreme_primir_anno.gff3") out_mir = Path("files/extreme_mir_anno.gff3") - + return in_extremes, out_primir, out_mir @@ -52,7 +45,7 @@ def gff_extremes_chr(): in_chr_extremes = Path("files/in_mirna_extreme_chr_mirs.gff3") out_primir = Path("files/extreme_chr_primir_anno.gff3") out_mir = Path("files/extreme_chr_mir_anno.gff3") - + return chr_size, in_chr_extremes, out_primir, out_mir @@ -62,10 +55,7 @@ class TestParseArguments: def test_no_files(self, monkeypatch): """Call without input nor output files.""" with pytest.raises(SystemExit) as sysex: - monkeypatch.setattr( - sys, 'argv', - ['mirna_extension'] - ) + monkeypatch.setattr(sys, "argv", ["mirna_extension"]) parse_arguments().parse_args() assert sysex.value.code == 2 @@ -74,13 +64,16 @@ def test_in_files(self, monkeypatch, gff_empty, tmp_path): gff_in = gff_empty monkeypatch.setattr( - sys, 'argv', - ['mirna_extension', - str(gff_in), - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_extension", + str(gff_in), + "--outdir", + str(tmp_path), + ], ) - + args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) @@ -89,15 +82,20 @@ def test_all_arguments(self, monkeypatch, gff_extremes_chr, tmp_path): chr_size, gff_in, gff_pre_out, gff_mir_out = gff_extremes_chr monkeypatch.setattr( - sys, 'argv', - ['mirna_extension', - str(gff_in), - '--outdir', str(tmp_path), - '--chr', str(chr_size), - '--extension', '6', - ] + sys, + "argv", + [ + "mirna_extension", + str(gff_in), + "--outdir", + str(tmp_path), + "--chr", + str(chr_size), + "--extension", + "6", + ], ) - + args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) @@ -109,98 +107,104 @@ def test_main_empty_file(self, monkeypatch, gff_empty, tmp_path): """Test main function with an empty file.""" gff_empty = gff_empty - primir_out = tmp_path/"extended_primir_annotation_6_nt.gff3" - mir_out = tmp_path/"extended_mirna_annotation_6_nt.gff3" - + primir_out = tmp_path / "extended_primir_annotation_6_nt.gff3" + mir_out = tmp_path / "extended_mirna_annotation_6_nt.gff3" + monkeypatch.setattr( - sys, 'argv', - ['mirna_extension', - str(gff_empty), - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_extension", + str(gff_empty), + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(gff_empty, 'r') as expected, open(primir_out, 'r') as output: - assert output.read() == expected.read() - - with open(gff_empty, 'r') as expected, open(mir_out, 'r') as output: - assert output.read() == expected.read() + with open(gff_empty, "r") as expected, open(primir_out, "r") as output: + assert output.read() == expected.read() + + with open(gff_empty, "r") as expected, open(mir_out, "r") as output: + assert output.read() == expected.read() - - - def test_main_no_extreme_coords(self, monkeypatch, tmp_path, gff_no_extremes): + def test_main_no_extreme_coords( + self, monkeypatch, tmp_path, gff_no_extremes + ): """Test main function with no extreme coords.""" in_gff, pre_gff, mir_gff = gff_no_extremes - primir_out = tmp_path/"extended_primir_annotation_6_nt.gff3" - mir_out = tmp_path/"extended_mirna_annotation_6_nt.gff3" + primir_out = tmp_path / "extended_primir_annotation_6_nt.gff3" + mir_out = tmp_path / "extended_mirna_annotation_6_nt.gff3" monkeypatch.setattr( - sys, 'argv', - ['mirna_extension', - str(in_gff), - '--outdir', str(tmp_path) - ] + sys, + "argv", + ["mirna_extension", str(in_gff), "--outdir", str(tmp_path)], ) args = parse_arguments().parse_args() main(args) - with open(pre_gff, 'r') as expected, open(primir_out, 'r') as output: - assert output.read() == expected.read() - - with open(mir_gff, 'r') as expected, open(mir_out, 'r') as output: - assert output.read() == expected.read() + with open(pre_gff, "r") as expected, open(primir_out, "r") as output: + assert output.read() == expected.read() + + with open(mir_gff, "r") as expected, open(mir_out, "r") as output: + assert output.read() == expected.read() def test_main_extreme_coords(self, monkeypatch, tmp_path, gff_extremes): """Test main function with extreme coords.""" in_gff, pre_gff, mir_gff = gff_extremes - primir_out = tmp_path/"extended_primir_annotation_6_nt.gff3" - mir_out = tmp_path/"extended_mirna_annotation_6_nt.gff3" + primir_out = tmp_path / "extended_primir_annotation_6_nt.gff3" + mir_out = tmp_path / "extended_mirna_annotation_6_nt.gff3" monkeypatch.setattr( - sys, 'argv', - ['mirna_extension', - str(in_gff), - '--outdir', str(tmp_path) - ] + sys, + "argv", + ["mirna_extension", str(in_gff), "--outdir", str(tmp_path)], ) args = parse_arguments().parse_args() main(args) - with open(pre_gff, 'r') as expected, open(primir_out, 'r') as output: - assert output.read() == expected.read() + with open(pre_gff, "r") as expected, open(primir_out, "r") as output: + assert output.read() == expected.read() - with open(mir_gff, 'r') as expected, open(mir_out, 'r') as output: - assert output.read() == expected.read() + with open(mir_gff, "r") as expected, open(mir_out, "r") as output: + assert output.read() == expected.read() - def test_main_extreme_coords(self, monkeypatch, tmp_path, gff_extremes_chr): + def test_main_extreme_coords_limit_size( + self, monkeypatch, tmp_path, gff_extremes_chr + ): """Test main function with extreme coords and limited by chr size.""" chr_size, in_gff, pre_gff, mir_gff = gff_extremes_chr - primir_out = tmp_path/"extended_primir_annotation_6_nt.gff3" - mir_out = tmp_path/"extended_mirna_annotation_6_nt.gff3" + primir_out = tmp_path / "extended_primir_annotation_6_nt.gff3" + mir_out = tmp_path / "extended_mirna_annotation_6_nt.gff3" monkeypatch.setattr( - sys, 'argv', - ['mirna_extension', - str(in_gff), - '--outdir', str(tmp_path), - '--chr', str(chr_size) - ] + sys, + "argv", + [ + "mirna_extension", + str(in_gff), + "--outdir", + str(tmp_path), + "--chr", + str(chr_size), + ], ) args = parse_arguments().parse_args() main(args) - with open(pre_gff, 'r') as expected, open(primir_out, 'r') as output: - assert output.read() == expected.read() + with open(pre_gff, "r") as expected, open(primir_out, "r") as output: + assert output.read() == expected.read() + + with open(mir_gff, "r") as expected, open(mir_out, "r") as output: + assert output.read() == expected.read() - with open(mir_gff, 'r') as expected, open(mir_out, 'r') as output: - assert output.read() == expected.read() -class TestLoadGffFile(): +class TestLoadGffFile: """Test for the 'load_gff_file' method.""" def test_load_gff_file(self, gff_no_extremes): @@ -212,20 +216,34 @@ def test_load_gff_file(self, gff_no_extremes): assert mirnaObject is not None assert isinstance(mirnaObject.db, gffutils.FeatureDB) - assert len(list(mirnaObject.db.features_of_type("miRNA_primary_transcript"))) == 2 + assert ( + len( + list( + mirnaObject.db.features_of_type("miRNA_primary_transcript") + ) + ) + == 2 + ) assert len(list(mirnaObject.db.features_of_type("miRNA"))) == 3 - def test_load_gff_file(self, monkeypatch, gff_no_extremes): + def test_load_gff_file_std(self, monkeypatch, gff_no_extremes): """Test input loading from standard input.""" in_file, pre_exp, mir_exp = gff_no_extremes - monkeypatch.setattr(sys, 'stdin', str(in_file)) + monkeypatch.setattr(sys, "stdin", str(in_file)) mirnaObject = MirnaExtension() mirnaObject.load_gff_file() assert mirnaObject is not None assert isinstance(mirnaObject.db, gffutils.FeatureDB) - assert len(list(mirnaObject.db.features_of_type("miRNA_primary_transcript"))) == 2 + assert ( + len( + list( + mirnaObject.db.features_of_type("miRNA_primary_transcript") + ) + ) + == 2 + ) assert len(list(mirnaObject.db.features_of_type("miRNA"))) == 3 @@ -236,57 +254,59 @@ def test_extend_mirnas_no_extreme_coords(self, tmp_path, gff_no_extremes): """Test miRNA extension with no extreme coordinates.""" in_file, pre_exp, mir_exp = gff_no_extremes - primir_out = tmp_path/"extended_primir_annotation_6_nt.gff3" - mir_out = tmp_path/"extended_mirna_annotation_6_nt.gff3" + primir_out = tmp_path / "extended_primir_annotation_6_nt.gff3" + mir_out = tmp_path / "extended_mirna_annotation_6_nt.gff3" mirnaObject = MirnaExtension() mirnaObject.load_gff_file(str(in_file)) mirnaObject.extend_mirnas(primir_out=primir_out, mir_out=mir_out) - with open(primir_out, 'r') as output, open(pre_exp, 'r') as expected: - assert output.read() == expected.read() - - with open(mir_out, 'r') as output, open(mir_exp, 'r') as expected: + with open(primir_out, "r") as output, open(pre_exp, "r") as expected: + assert output.read() == expected.read() + + with open(mir_out, "r") as output, open(mir_exp, "r") as expected: assert output.read() == expected.read() - + def test_extend_mirnas_extreme_coords(self, tmp_path, gff_extremes): """Test miRNA extension with miRNAs having extreme coordinates.""" in_file, pre_exp, mir_exp = gff_extremes - primir_out = tmp_path/"extended_primir_annotation_6_nt.gff3" - mir_out = tmp_path/"extended_mirna_annotation_6_nt.gff3" + primir_out = tmp_path / "extended_primir_annotation_6_nt.gff3" + mir_out = tmp_path / "extended_mirna_annotation_6_nt.gff3" mirnaObject = MirnaExtension() mirnaObject.load_gff_file(str(in_file)) mirnaObject.extend_mirnas(primir_out=primir_out, mir_out=mir_out) - with open(primir_out, 'r') as output, open(pre_exp, 'r') as expected: - assert output.read() == expected.read() - - with open(mir_out, 'r') as output, open(mir_exp, 'r') as expected: + with open(primir_out, "r") as output, open(pre_exp, "r") as expected: + assert output.read() == expected.read() + + with open(mir_out, "r") as output, open(mir_exp, "r") as expected: assert output.read() == expected.read() - def test_extend_mirnas_no_extreme_coords(self, tmp_path, gff_extremes_chr): + def test_extend_mirnas_extreme_coords_chr_boundaries( + self, tmp_path, gff_extremes_chr + ): """Test miRNA extension with extreme coordinates and chr boundaries.""" chr_size, in_file, pre_exp, mir_exp = gff_extremes_chr - primir_out = tmp_path/"extended_primir_annotation_6_nt.gff3" - mir_out = tmp_path/"extended_mirna_annotation_6_nt.gff3" + primir_out = tmp_path / "extended_primir_annotation_6_nt.gff3" + mir_out = tmp_path / "extended_mirna_annotation_6_nt.gff3" len_dict = {} - with open(chr_size, 'r') as f: + with open(chr_size, "r") as f: for line in f: - line = line.strip().split('\t') + line = line.strip().split("\t") len_dict[line[0]] = int(line[1]) mirnaObject = MirnaExtension() mirnaObject.load_gff_file(str(in_file)) - mirnaObject.extend_mirnas(primir_out=primir_out, - mir_out=mir_out, - seq_lengths=len_dict) - - with open(primir_out, 'r') as output, open(pre_exp, 'r') as expected: - assert output.read() == expected.read() - - with open(mir_out, 'r') as output, open(mir_exp, 'r') as expected: + mirnaObject.extend_mirnas( + primir_out=primir_out, mir_out=mir_out, seq_lengths=len_dict + ) + + with open(primir_out, "r") as output, open(pre_exp, "r") as expected: + assert output.read() == expected.read() + + with open(mir_out, "r") as output, open(mir_exp, "r") as expected: assert output.read() == expected.read() diff --git a/scripts/tests/test_mirna_quantification.py b/scripts/tests/test_mirna_quantification.py index e1715283..cc044b12 100644 --- a/scripts/tests/test_mirna_quantification.py +++ b/scripts/tests/test_mirna_quantification.py @@ -6,16 +6,14 @@ import pysam import pytest -sys.path.append("../../") - -from scripts.mirna_quantification import ( +from ..mirna_quantification import ( collapsed_nh_contribution, collapsed_contribution, nh_contribution, contribution, get_name, main, - parse_arguments + parse_arguments, ) @@ -85,7 +83,7 @@ def read_sam_file(): @pytest.fixture def read_len_sam_file(): - """Import path to test files with read IDs and feature len in the output table.""" + """Import path to test files with read IDs and len in the output table.""" sam_file = Path("files/in_aln_tag.sam") out_table = Path("files/len_ids_iso_mirna_quantification") @@ -160,9 +158,11 @@ def test_no_input(self, monkeypatch): """Call without input file.""" with pytest.raises(SystemExit) as sysex: monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - ] + sys, + "argv", + [ + "mirna_quantification", + ], ) parse_arguments().parse_args() assert sysex.value.code == 2 @@ -172,11 +172,14 @@ def test_correct_input(self, monkeypatch, empty_file): in_sam = empty_file monkeypatch.setattr( - sys, 'argv', - ['mirna__quantification', - str(in_sam), - '--lib', 'test_lib', - ] + sys, + "argv", + [ + "mirna__quantification", + str(in_sam), + "--lib", + "test_lib", + ], ) args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) @@ -187,10 +190,13 @@ def test_too_many_input_files(self, monkeypatch, empty_file): with pytest.raises(SystemExit) as sysex: monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(in_sam), str(in_sam), - ] + sys, + "argv", + [ + "mirna_quantification", + str(in_sam), + str(in_sam), + ], ) parse_arguments().parse_args() assert sysex.value.code == 2 @@ -200,34 +206,40 @@ def test_all_input(self, monkeypatch, empty_file): in_sam = empty_file monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(in_sam), - '--count', - '--len', - '--read-ids', - '--collapsed', - '--nh', - '--tag', 'YW', - '--outdir', 'Path.cwd()', - '--lib', 'test_lib', - '--mir-list', '[mirna, isomir]' - ] + sys, + "argv", + [ + "mirna_quantification", + str(in_sam), + "--count", + "--len", + "--read-ids", + "--collapsed", + "--nh", + "--tag", + "YW", + "--outdir", + "Path.cwd()", + "--lib", + "test_lib", + "--mir-list", + "[mirna, isomir]", + ], ) args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) -class TestGetContribution(): +class TestGetContribution: """Test 'get_contribution()' function.""" def test_collapsed_nh(self, alns): """Test collapsed alignment with NH in the name.""" - assert collapsed_nh_contribution(alns[0]) == 2/3 + assert collapsed_nh_contribution(alns[0]) == 2 / 3 def test_uncollpased_nh(self, alns): """Test uncollapsed alignment with NH in the name.""" - assert nh_contribution(alns[1]) == 1/4 + assert nh_contribution(alns[1]) == 1 / 4 def test_collapsed_no_nh(self, alns): """Test collapsed alignment without NH in the name.""" @@ -235,7 +247,7 @@ def test_collapsed_no_nh(self, alns): def test_uncollpased_no_nh(self, alns): """Test uncollapsed alignment without NH in the name.""" - assert contribution(alns[3]) == 1/8 + assert contribution(alns[3]) == 1 / 8 def test_uncollpased_missing_nh(self, alns): """Test uncollapsed alignment with missing NH value.""" @@ -267,222 +279,276 @@ class TestMain: def test_main_empty_sam_file(self, monkeypatch, tmp_path, empty_file): """Test main function with an empty SAM file.""" empty_in, empty_out = empty_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(empty_in), - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(empty_in), + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(empty_out, 'r') as expected, open(output, 'r') as out_file: + with open(empty_out, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() def test_main_isomir_mirna_sam_file(self, monkeypatch, tmp_path, sam_file): """Test main function with complete SAM file.""" infile, out_table = sam_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--collapsed', - '--nh', - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--collapsed", + "--nh", + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(out_table, 'r') as expected, open(output, 'r') as out_file: + with open(out_table, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() - def test_main_iso_sam_file(self, monkeypatch, tmp_path, iso_mirna_sam_file): + def test_main_iso_sam_file( + self, monkeypatch, tmp_path, iso_mirna_sam_file + ): """Test main function tabulating only isomiRs.""" infile, iso_out_table, mirna_out_table = iso_mirna_sam_file - mirna_output = tmp_path/"mirna_counts_lib" + mirna_output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--collapsed', - '--nh', - '--outdir', str(tmp_path), - '--mir-list', "isomir", - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--collapsed", + "--nh", + "--outdir", + str(tmp_path), + "--mir-list", + "isomir", + ], ) args = parse_arguments().parse_args() main(args) - with open(iso_out_table, 'r') as expected, open(mirna_output, 'r') as out_file: + with open(iso_out_table, "r") as expected, open( + mirna_output, "r" + ) as out_file: assert out_file.read() == expected.read() - def test_main_mirna_sam_file(self, monkeypatch, tmp_path, iso_mirna_sam_file): + def test_main_mirna_sam_file( + self, monkeypatch, tmp_path, iso_mirna_sam_file + ): """Test main function tabulating only canonical miRNA.""" infile, iso_out_table, mirna_out_table = iso_mirna_sam_file - mirna_output = tmp_path/"mirna_counts_lib" + mirna_output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--collapsed', - '--nh', - '--outdir', str(tmp_path), - '--mir-list', "mirna" - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--collapsed", + "--nh", + "--outdir", + str(tmp_path), + "--mir-list", + "mirna", + ], ) args = parse_arguments().parse_args() main(args) - with open(mirna_out_table, 'r') as expected, open(mirna_output, 'r') as out_file: + with ( + open(mirna_out_table, "r") as expected, + open(mirna_output, "r") as out_file, + ): assert out_file.read() == expected.read() - def test_main_xn_tag_sam_file(self, monkeypatch, tmp_path, xn_tag_sam_file): + def test_main_xn_tag_sam_file( + self, monkeypatch, tmp_path, xn_tag_sam_file + ): """Test main function with feature name in the XN tag.""" infile, out_table = xn_tag_sam_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--collapsed', - '--nh', - '--tag', 'XN', - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--collapsed", + "--nh", + "--tag", + "XN", + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(out_table, 'r') as expected, open(output, 'r') as out_file: + with open(out_table, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() - def test_main_nh_missing_sam_file(self, monkeypatch, tmp_path, nh_missing_sam_file): + def test_main_nh_missing_sam_file( + self, monkeypatch, tmp_path, nh_missing_sam_file + ): """Test main function with some missing NH tag in SAM file.""" infile, out_table = nh_missing_sam_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--collapsed', - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--collapsed", + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(out_table, 'r') as expected, open(output, 'r') as out_file: + with open(out_table, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() def test_main_seq_len_sam_file(self, monkeypatch, tmp_path, len_sam_file): """Test main function with read lenght in output table.""" infile, out_table = len_sam_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--collapsed', - '--nh', - '--len', - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--collapsed", + "--nh", + "--len", + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(out_table, 'r') as expected, open(output, 'r') as out_file: + with open(out_table, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() def test_main_read_sam_file(self, monkeypatch, tmp_path, read_sam_file): """Test main function with intersecting read IDs in the output.""" infile, out_table = read_sam_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--collapsed', - '--nh', - '--read-ids', - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--collapsed", + "--nh", + "--read-ids", + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(out_table, 'r') as expected, open(output, 'r') as out_file: + with open(out_table, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() - def test_main_read_len_sam_file(self, monkeypatch, tmp_path, read_len_sam_file): + def test_main_read_len_sam_file( + self, monkeypatch, tmp_path, read_len_sam_file + ): """Test main function with read IDs and feature length im output.""" infile, out_table = read_len_sam_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--collapsed', - '--nh', - '--len', - '--read-ids', - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--collapsed", + "--nh", + "--len", + "--read-ids", + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(out_table, 'r') as expected, open(output, 'r') as out_file: + with open(out_table, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() - - def test_main_uncollpased_sam_file(self, monkeypatch, tmp_path, uncollapsed_sam_file): + def test_main_uncollpased_sam_file( + self, monkeypatch, tmp_path, uncollapsed_sam_file + ): """Test main function with uncollapsed SAM file.""" infile, out_table = uncollapsed_sam_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--nh', - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--nh", + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(out_table, 'r') as expected, open(output, 'r') as out_file: + with open(out_table, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() - def test_main_uncollpased_missing_nh_sam_file(self, monkeypatch, tmp_path, uncollapsed_missing_nh_sam_file): + def test_main_uncollap_miss_nh_sam_file( + self, monkeypatch, tmp_path, uncollapsed_missing_nh_sam_file + ): """Test main function with uncollapsed SAM file and missing NH tags.""" infile, out_table = uncollapsed_missing_nh_sam_file - output = tmp_path/"mirna_counts_lib" + output = tmp_path / "mirna_counts_lib" monkeypatch.setattr( - sys, 'argv', - ['mirna_quantification', - str(infile), - '--count', - '--outdir', str(tmp_path), - ] + sys, + "argv", + [ + "mirna_quantification", + str(infile), + "--count", + "--outdir", + str(tmp_path), + ], ) args = parse_arguments().parse_args() main(args) - with open(out_table, 'r') as expected, open(output, 'r') as out_file: + with open(out_table, "r") as expected, open(output, "r") as out_file: assert out_file.read() == expected.read() diff --git a/scripts/tests/test_oligomap_output_to_sam_nh_filtered.py b/scripts/tests/test_oligomap_output_to_sam_nh_filtered.py index 7a143f07..ac635595 100644 --- a/scripts/tests/test_oligomap_output_to_sam_nh_filtered.py +++ b/scripts/tests/test_oligomap_output_to_sam_nh_filtered.py @@ -3,20 +3,16 @@ import argparse from pathlib import Path import sys -from typing import NamedTuple import pytest -sys.path.append("../../") - - -from scripts.oligomap_output_to_sam_nh_filtered import( +from ..oligomap_output_to_sam_nh_filtered import ( eval_aln, Fields, get_cigar_md, get_sam_fields, main, - parse_arguments + parse_arguments, ) @@ -45,39 +41,119 @@ def transcriptome_no_nh(): return oligo_in, oligo_out + @pytest.fixture def single_read(): """Import path to test file with a single read.""" oligo_out = Path("files/oligomap_single_read.sam") - + return oligo_out + @pytest.fixture def aln_fields(): """Create sample alignment as a Fields class NamedTuple.""" # Perfect alignment - field_1 = Fields("read_1", "0", "19", "44377", "255", "19M", '*', '0', '0', - "CTACAAAGGGAAGCACTTT", '*', "NM:i:0", "MD:Z:19") + field_1 = Fields( + "read_1", + "0", + "19", + "44377", + "255", + "19M", + "*", + "0", + "0", + "CTACAAAGGGAAGCACTTT", + "*", + "NM:i:0", + "MD:Z:19", + ) # Alignment with a mismatch in the first position - field_2 = Fields("read_1", '0', "19", "53471", "255", "19M", '*', '0', '0', - "CTACAAAGGGAAGCACTTT", '*', "NM:i:1", "MD:Z:G18") + field_2 = Fields( + "read_1", + "0", + "19", + "53471", + "255", + "19M", + "*", + "0", + "0", + "CTACAAAGGGAAGCACTTT", + "*", + "NM:i:1", + "MD:Z:G18", + ) # Alignment with a mismatch in the last position - field_3 = Fields("read_1", "0", "19", "44278", "255", "19M", '*', '0', '0', - "CTACAAAGGGAAGCACTTT", '*', "NM:i:1", "MD:Z:18C") - + field_3 = Fields( + "read_1", + "0", + "19", + "44278", + "255", + "19M", + "*", + "0", + "0", + "CTACAAAGGGAAGCACTTT", + "*", + "NM:i:1", + "MD:Z:18C", + ) + # Alignment with a mismatch in the middle of the read sequence - field_4 = Fields("read_1", "0", "19", "50971", "255", "19M", '*', '0', '0', - "CTACAAAGGGAAGCACTTT", '*', "NM:i:1", "MD:Z:14C4") - + field_4 = Fields( + "read_1", + "0", + "19", + "50971", + "255", + "19M", + "*", + "0", + "0", + "CTACAAAGGGAAGCACTTT", + "*", + "NM:i:1", + "MD:Z:14C4", + ) + # Alignment with an insertion at read's first position - field_5 = Fields("read_2", "16", "19", "7627", "255", "1I22M", '*', '0', '0', - "AAAGCACCTCCAGAGCTTGAAGC", '*', "NM:i:1", "MD:Z:23") - + field_5 = Fields( + "read_2", + "16", + "19", + "7627", + "255", + "1I22M", + "*", + "0", + "0", + "AAAGCACCTCCAGAGCTTGAAGC", + "*", + "NM:i:1", + "MD:Z:23", + ) + # Alignment with an insertion in the middle of the read sequence - field_6 = Fields("read_2", "16", "19", "7886", "255", "9M1I12M", '*', '0', '0', - "AAAGCACCTCCAGAGCTTGAAGC", '*', "NM:i:1", "MD:Z:23") + field_6 = Fields( + "read_2", + "16", + "19", + "7886", + "255", + "9M1I12M", + "*", + "0", + "0", + "AAAGCACCTCCAGAGCTTGAAGC", + "*", + "NM:i:1", + "MD:Z:23", + ) return [field_1, field_2, field_3, field_4, field_5, field_6] @@ -170,8 +246,7 @@ def test_no_files(self, monkeypatch): """Call without input nor output files.""" with pytest.raises(SystemExit) as sysex: monkeypatch.setattr( - sys, 'argv', - ['oligomap_output_to_sam_nh_filtered'] + sys, "argv", ["oligomap_output_to_sam_nh_filtered"] ) parse_arguments().parse_args() assert sysex.value.code == 2 @@ -181,12 +256,14 @@ def test_in_files(self, monkeypatch, empty_file): empty_in = empty_file monkeypatch.setattr( - sys, 'argv', - ['oligomap_output_to_sam_nh_filtered', - str(empty_in), - ] + sys, + "argv", + [ + "oligomap_output_to_sam_nh_filtered", + str(empty_in), + ], ) - + args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) @@ -195,13 +272,16 @@ def test_all_arguments(self, monkeypatch, genome_nh_2): fa_in, sam_out = genome_nh_2 monkeypatch.setattr( - sys, 'argv', - ['oligomap_output_to_sam_nh_filtered', - str(fa_in), - '-n', '100', - ] + sys, + "argv", + [ + "oligomap_output_to_sam_nh_filtered", + str(fa_in), + "-n", + "100", + ], ) - + args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) @@ -213,93 +293,121 @@ def test_perfect_aln(self, alns): """Test perfect alignment.""" result = ("19M", "MD:Z:19") - assert get_cigar_md(alns[0][0], alns[0][1], - alns[0][2], alns[0][3]) == result + assert ( + get_cigar_md(alns[0][0], alns[0][1], alns[0][2], alns[0][3]) + == result + ) def test_mm_first_pos_aln(self, alns): """Test mismatch at read's first position.""" result = ("19M", "MD:Z:G18") - assert get_cigar_md(alns[1][0], alns[1][1], - alns[1][2], alns[1][3]) == result + assert ( + get_cigar_md(alns[1][0], alns[1][1], alns[1][2], alns[1][3]) + == result + ) def test_mm_last_pos_aln(self, alns): """Test mismatch at read's last position.""" result = ("19M", "MD:Z:18C") - assert get_cigar_md(alns[2][0], alns[2][1], - alns[2][2], alns[2][3]) == result + assert ( + get_cigar_md(alns[2][0], alns[2][1], alns[2][2], alns[2][3]) + == result + ) def test_mm_middle_aln(self, alns): """Test mismatch in the middle of the read.""" result = ("19M", "MD:Z:14C4") - assert get_cigar_md(alns[3][0], alns[3][1], - alns[3][2], alns[3][3]) == result + assert ( + get_cigar_md(alns[3][0], alns[3][1], alns[3][2], alns[3][3]) + == result + ) def test_in_first_pos_aln(self, alns): """Test insertion at read's first position.""" result = ("1I22M", "MD:Z:23") - assert get_cigar_md(alns[4][0], alns[4][1], - alns[4][2], alns[4][3]) == result + assert ( + get_cigar_md(alns[4][0], alns[4][1], alns[4][2], alns[4][3]) + == result + ) def test_in_last_pos_aln(self, alns): """Test insertion at read's last position.""" result = ("22M1I", "MD:Z:23") - assert get_cigar_md(alns[5][0], alns[5][1], - alns[5][2], alns[5][3]) == result + assert ( + get_cigar_md(alns[5][0], alns[5][1], alns[5][2], alns[5][3]) + == result + ) def test_in_middle_aln(self, alns): """Test insertion in the middle of the read.""" result = ("9M1I13M", "MD:Z:23") - assert get_cigar_md(alns[6][0], alns[6][1], - alns[6][2], alns[6][3]) == result + assert ( + get_cigar_md(alns[6][0], alns[6][1], alns[6][2], alns[6][3]) + == result + ) def test_del_first_pos_aln(self, alns): """Test deletion at read's first position.""" result = ("1D22M", "MD:Z:^T22") - assert get_cigar_md(alns[7][0], alns[7][1], - alns[7][2], alns[7][3]) == result + assert ( + get_cigar_md(alns[7][0], alns[7][1], alns[7][2], alns[7][3]) + == result + ) def test_del_last_pos_aln(self, alns): """Test deletion at read's last position.""" result = ("22M1D", "MD:Z:22^A0") - assert get_cigar_md(alns[8][0], alns[8][1], - alns[8][2], alns[8][3]) == result + assert ( + get_cigar_md(alns[8][0], alns[8][1], alns[8][2], alns[8][3]) + == result + ) def test_del_middle_aln(self, alns): """Test deletion in the middle of the read.""" result = ("11M1D10M", "MD:Z:11^A10") - assert get_cigar_md(alns[9][0], alns[9][1], - alns[9][2], alns[9][3]) == result + assert ( + get_cigar_md(alns[9][0], alns[9][1], alns[9][2], alns[9][3]) + == result + ) -class TestGetSAMFields(): +class TestGetSAMFields: """Test 'get_sam_fields()' function.""" - def test_pos_strand_no_err(self,alns, aln_fields): + def test_pos_strand_no_err(self, alns, aln_fields): """Test perfect alignment in the positive strand.""" line1 = "read_1 (19 nc) 1...19 19 44377...44395" line2 = "19" line3 = "errors: 0 orientation: +" - assert get_sam_fields([line1, line2, line3, alns[0][1], - alns[0][2], alns[0][3]]) == aln_fields[0] + assert ( + get_sam_fields( + [line1, line2, line3, alns[0][1], alns[0][2], alns[0][3]] + ) + == aln_fields[0] + ) - def test_neg_strand_one_err(self,alns, aln_fields): + def test_neg_strand_one_err(self, alns, aln_fields): """Test alignment with an insertion in the negative strand.""" line1 = "read_2 (23 nc) 1...23 19 7886...7908" line2 = "19" line3 = "errors: 1 orientation: -" - assert get_sam_fields([line1, line2, line3, alns[6][1], - alns[6][2], alns[6][3]]) == aln_fields[5] + assert ( + get_sam_fields( + [line1, line2, line3, alns[6][1], alns[6][2], alns[6][3]] + ) + == aln_fields[5] + ) class TestEvalAln: @@ -308,43 +416,43 @@ class TestEvalAln: def test_eval_empty_dict_new_read(self, aln_fields): """Test evaluation with a new read and an empty dictionary.""" d = dict() - minerr_nh = {"read_0" : ['0', 1]} + minerr_nh = {"read_0": ["0", 1]} aln = aln_fields[0] nhfilter = None eval_aln(nhfilter, d, minerr_nh, aln) assert list(d.keys())[0] == aln.read_name - assert minerr_nh[aln.read_name] == ['0', 1] - + assert minerr_nh[aln.read_name] == ["0", 1] + def test_eval_empty_dict_smaller_error(self, aln_fields): """Test evaluation with a smaller error and an empty dictionary.""" d = dict() - minerr_nh = {"read_1" : ['1', 1]} + minerr_nh = {"read_1": ["1", 1]} aln = aln_fields[0] nhfilter = None eval_aln(nhfilter, d, minerr_nh, aln) assert list(d.keys())[0] == aln.read_name - assert minerr_nh[aln.read_name] == ['0', 1] + assert minerr_nh[aln.read_name] == ["0", 1] def test_increase_nh_no_filter(self, aln_fields): """Test evaluation when increasing NH without a maximum value.""" d = {"read_1": [aln_fields[1], aln_fields[2]]} - minerr_nh = {"read_1" : ['1', 2]} + minerr_nh = {"read_1": ["1", 2]} aln = aln_fields[3] nhfilter = None eval_aln(nhfilter, d, minerr_nh, aln) assert len(d[aln.read_name]) == 3 - assert minerr_nh[aln.read_name] == ['1', 3] + assert minerr_nh[aln.read_name] == ["1", 3] def test_exceed_nh_filter_2(self, capsys, aln_fields): """Test evaluation when exceeding the maximum NH set to 2.""" d = {"read_1": [aln_fields[1], aln_fields[2]]} - minerr_nh = {"read_1" : ['1', 2]} + minerr_nh = {"read_1": ["1", 2]} aln = aln_fields[3] nhfilter = 2 @@ -352,44 +460,43 @@ def test_exceed_nh_filter_2(self, capsys, aln_fields): captured = capsys.readouterr() assert len(d) == 0 - assert minerr_nh[aln.read_name] == ['1', 3] + assert minerr_nh[aln.read_name] == ["1", 3] assert captured.err == "Filtered by NH | Read read_1 | Errors = 1\n" - + def test_no_exceed_nh_filter_2(self, aln_fields): """Test evaluation when increasing NH with maximum value of 2.""" d = {"read_1": [aln_fields[1]]} - minerr_nh = {"read_1" : ['1', 1]} + minerr_nh = {"read_1": ["1", 1]} aln = aln_fields[2] nhfilter = 2 eval_aln(nhfilter, d, minerr_nh, aln) assert len(d[aln.read_name]) == 2 - assert minerr_nh[aln.read_name] == ['1', 2] - + assert minerr_nh[aln.read_name] == ["1", 2] + def test_smaller_min_error(self, capsys, aln_fields): """Test evaluation when having a smaller minimumm error.""" d = {"read_1": [aln_fields[1], aln_fields[2]]} - minerr_nh = {"read_1" : ['1', 2]} + minerr_nh = {"read_1": ["1", 2]} aln = aln_fields[0] - nhfilter = None + nhfilter = None eval_aln(nhfilter, d, minerr_nh, aln) captured = capsys.readouterr() assert len(d[aln.read_name]) == 1 - assert minerr_nh[aln.read_name] == ['0', 1] + assert minerr_nh[aln.read_name] == ["0", 1] assert captured.err == "Filtered by ERROR | Read read_1 | Errors = 1\n" def test_different_read(self, capsys, tmp_path, aln_fields, single_read): """Test evaluation when having to write due to a different read.""" - output = tmp_path/"oligomap_genome_mappings.sam" out_file = single_read d = {"read_1": [aln_fields[1], aln_fields[2]]} - minerr_nh = {"read_1" : ['1', 2]} + minerr_nh = {"read_1": ["1", 2]} aln = aln_fields[4] - nhfilter = None + nhfilter = None eval_aln(nhfilter, d, minerr_nh, aln) captured = capsys.readouterr() @@ -398,9 +505,9 @@ def test_different_read(self, capsys, tmp_path, aln_fields, single_read): assert len(minerr_nh) == 1 assert captured.err == "Written read read_1 | Errors = 1 | NH = 2\n" - with open(out_file, 'r') as expected: + with open(out_file, "r") as expected: assert captured.out == expected.read() - + class TestMain: """Test 'main()' function.""" @@ -410,16 +517,18 @@ def test_main_empty_file(self, monkeypatch, capsys, empty_file): empty_in = empty_file monkeypatch.setattr( - sys, 'argv', - ['oligomap_output_to_sam_nh_filtered', - str(empty_in), - ] + sys, + "argv", + [ + "oligomap_output_to_sam_nh_filtered", + str(empty_in), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(empty_in, 'r') as expected: + with open(empty_in, "r") as expected: assert captured.out == expected.read() def test_main_max_nh_2(self, monkeypatch, capsys, genome_nh_2): @@ -427,34 +536,39 @@ def test_main_max_nh_2(self, monkeypatch, capsys, genome_nh_2): in_file, out_file = genome_nh_2 monkeypatch.setattr( - sys, 'argv', - ['oligomap_output_to_sam_nh_filtered', - str(in_file), - '-n', '2', - ] + sys, + "argv", + [ + "oligomap_output_to_sam_nh_filtered", + str(in_file), + "-n", + "2", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(out_file, 'r') as expected: + with open(out_file, "r") as expected: assert captured.out == expected.read() - def test_main_no_nh_transcriptome(self, monkeypatch, capsys, - transcriptome_no_nh): + def test_main_no_nh_transcriptome( + self, monkeypatch, capsys, transcriptome_no_nh + ): """Test main function with no NH set for transcriptome mappings.""" in_file, out_file = transcriptome_no_nh monkeypatch.setattr( - sys, 'argv', - ['oligomap_output_to_sam_nh_filtered', - str(in_file), - ] + sys, + "argv", + [ + "oligomap_output_to_sam_nh_filtered", + str(in_file), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(out_file, 'r') as expected: + with open(out_file, "r") as expected: assert captured.out == expected.read() - diff --git a/scripts/tests/test_primir_quantification.py b/scripts/tests/test_primir_quantification.py index f276bd4e..55c65023 100644 --- a/scripts/tests/test_primir_quantification.py +++ b/scripts/tests/test_primir_quantification.py @@ -6,12 +6,7 @@ import pytest -sys.path.append("../../") - -from scripts.primir_quantification import ( - main, - parse_arguments -) +from ..primir_quantification import main, parse_arguments @pytest.fixture @@ -56,7 +51,7 @@ def bed_id_files(): out_table = Path("files/id_primir_quantification") return in_bed, out_table - + @pytest.fixture def bed_some_extension_files(): @@ -66,6 +61,7 @@ def bed_some_extension_files(): return in_bed, out_table + @pytest.fixture def bed_collapsed_file(): """Import path to test files with full content.""" @@ -91,9 +87,11 @@ def test_no_input(self, monkeypatch): """Call without input file.""" with pytest.raises(SystemExit) as sysex: monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - ] + sys, + "argv", + [ + "primir_quantification", + ], ) parse_arguments().parse_args() assert sysex.value.code == 2 @@ -103,42 +101,50 @@ def test_correct_input(self, monkeypatch, bed_file): in_bed, out_table = bed_file monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + ], ) args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) - + def test_too_many_input_files(self, monkeypatch, bed_file): """Call with too many input file.""" in_bed, out_table = bed_file - + with pytest.raises(SystemExit) as sysex: monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), str(in_bed), - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + str(in_bed), + ], ) parse_arguments().parse_args() assert sysex.value.code == 2 - + def test_all_input(self, monkeypatch, bed_file): """Call with all the options.""" in_bed, out_table = bed_file monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - '--id', "name", - '--feat-extension', - '--read-ids', - '--collapsed', - '--nh' - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + "--id", + "name", + "--feat-extension", + "--read-ids", + "--collapsed", + "--nh", + ], ) args = parse_arguments().parse_args() assert isinstance(args, argparse.Namespace) @@ -152,52 +158,62 @@ def test_main_empty_bed_file(self, monkeypatch, capsys, empty_file): empty_file = empty_file monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(empty_file), - ] + sys, + "argv", + [ + "primir_quantification", + str(empty_file), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(empty_file, 'r') as out_file: + with open(empty_file, "r") as out_file: assert captured.out == out_file.read() - def test_main_no_extension(self, monkeypatch, capsys, bed_no_extension_files): + def test_main_no_extension( + self, monkeypatch, capsys, bed_no_extension_files + ): """Test main function with no extension in features names.""" in_bed, expected_out = bed_no_extension_files monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(expected_out, 'r') as out_file: + with open(expected_out, "r") as out_file: assert captured.out == out_file.read() - def test_main_id_extension(self, monkeypatch, capsys, bed_extension_id_files): + def test_main_id_extension( + self, monkeypatch, capsys, bed_extension_id_files + ): """Test main function with extension in feature name and read names.""" in_bed, expected_out = bed_extension_id_files monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - '--feat-extension', - '--read-ids', - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + "--feat-extension", + "--read-ids", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(expected_out, 'r') as out_file: + with open(expected_out, "r") as out_file: assert captured.out == out_file.read() def test_main_id(self, monkeypatch, capsys, bed_id_files): @@ -205,35 +221,41 @@ def test_main_id(self, monkeypatch, capsys, bed_id_files): in_bed, expected_out = bed_id_files monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - '--read-ids', - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + "--read-ids", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(expected_out, 'r') as out_file: + with open(expected_out, "r") as out_file: assert captured.out == out_file.read() - def test_main_some_extension_file(self, monkeypatch, capsys, bed_some_extension_files): + def test_main_some_extension_file( + self, monkeypatch, capsys, bed_some_extension_files + ): """Test main function with read names.""" in_bed, expected_out = bed_some_extension_files monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - '--feat-extension', - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + "--feat-extension", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(expected_out, 'r') as out_file: + with open(expected_out, "r") as out_file: assert captured.out == out_file.read() def test_main_collpased_nh_file(self, monkeypatch, capsys, bed_file): @@ -241,36 +263,42 @@ def test_main_collpased_nh_file(self, monkeypatch, capsys, bed_file): in_bed, expected_out = bed_file monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - '--collapsed', - '--nh', - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + "--collapsed", + "--nh", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(expected_out, 'r') as out_file: + with open(expected_out, "r") as out_file: assert captured.out == out_file.read() - def test_main_collpased_file(self, monkeypatch, capsys, bed_collapsed_file): + def test_main_collpased_file( + self, monkeypatch, capsys, bed_collapsed_file + ): """Test main function with collapsed alignments.""" in_bed, expected_out = bed_collapsed_file monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - '--collapsed', - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + "--collapsed", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(expected_out, 'r') as out_file: + with open(expected_out, "r") as out_file: assert captured.out == out_file.read() def test_main_nh_file(self, monkeypatch, capsys, bed_nh_file): @@ -278,15 +306,17 @@ def test_main_nh_file(self, monkeypatch, capsys, bed_nh_file): in_bed, expected_out = bed_nh_file monkeypatch.setattr( - sys, 'argv', - ['primir_quantification', - str(in_bed), - '--nh', - ] + sys, + "argv", + [ + "primir_quantification", + str(in_bed), + "--nh", + ], ) args = parse_arguments().parse_args() main(args) captured = capsys.readouterr() - with open(expected_out, 'r') as out_file: - assert captured.out == out_file.read() + with open(expected_out, "r") as out_file: + assert captured.out == out_file.read() diff --git a/scripts/validation_fasta.py b/scripts/validation_fasta.py index ed0e65e2..5d786151 100755 --- a/scripts/validation_fasta.py +++ b/scripts/validation_fasta.py @@ -15,68 +15,65 @@ # ARGUMENTS # parser = ArgumentParser( - description=__doc__, - formatter_class=RawDescriptionHelpFormatter - ) + description=__doc__, formatter_class=RawDescriptionHelpFormatter +) parser.add_argument( - '-v', '--version', - action='version', - version='%(prog)s 1.0', - help="Show program's version number and exit" - ) + "-v", + "--version", + action="version", + version="%(prog)s 1.0", + help="Show program's version number and exit", +) parser.add_argument( - '--trim', + "--trim", help=( "Character's used to trim the ID. Remove anything that follows the " - "character's. Write \\ infront of \'.\' and \'-\' " - "(i.e trim=\"$\\.\\-|_\"). Default: first white space" - ), + "character's. Write \\ infront of '.' and '-' " + '(i.e trim="$\\.\\-|_"). Default: first white space' + ), type=str, - nargs='?', - default="" - ) + nargs="?", + default="", +) parser.add_argument( - '--idlist', - help="Generate text file with the sequences IDs. One ID per line." - ) + "--idlist", + help="Generate text file with the sequences IDs. One ID per line.", +) parser.add_argument( - '-f', '--filter', + "-f", + "--filter", help=( "Input ID list. Filter IDs and sequences from FASTA file with the " "mode selected. Filter file must contain ONE ID per line" - ), - ) + ), +) parser.add_argument( - '-m', '--mode', + "-m", + "--mode", help=( "Type of filtering fasta file: keep (k) or discard (d) IDs contained " "in the ID list file." - ), - choices=('k', 'd') - ) + ), + choices=("k", "d"), +) parser.add_argument( - '-r', '--remove', + "-r", + "--remove", help="Remove sequences from FASTA file longer than specified length.", - type=int - ) + type=int, +) parser.add_argument( - '-i', '--input', - required=True, - help="Input FASTA file", - type=str - ) -parser.add_argument( - '-o', '--output', - help="Output FASTA file" - ) + "-i", "--input", required=True, help="Input FASTA file", type=str +) +parser.add_argument("-o", "--output", help="Output FASTA file") args = parser.parse_args() if args.filter and not args.mode: sys.exit( - "ERROR! Mode argument required when using filter option. " - "(--mode, -m). See --help option." - ) + "ERROR! Mode argument required when using filter option. " + "(--mode, -m). See --help option." + ) # PARSE FASTA FILE # @@ -91,8 +88,8 @@ def __init__(self): self.features = "" -if args.input.endswith('.gz'): - f = gzip.open(args.input, 'rt') +if args.input.endswith(".gz"): + f = gzip.open(args.input, "rt") else: f = open(args.input, encoding="utf-8") @@ -105,15 +102,15 @@ def __init__(self): sys.stdout.write("Parsing FASTA file...") for line in f: - if re.match(r'^>', line): + if re.match(r"^>", line): nrec += 1 record.append(Seq()) # define id of the record if not args.trim: - mobj = re.match(r'^>(\S*)(.*)', line) + mobj = re.match(r"^>(\S*)(.*)", line) else: - mobj = re.match(f'^>([^{args.trim}]*)(.*)', line) + mobj = re.match(f"^>([^{args.trim}]*)(.*)", line) # add id and features if mobj: @@ -125,7 +122,7 @@ def __init__(self): inseq = 1 record[nrec].seq = line else: - cstring = record[nrec].seq+line + cstring = record[nrec].seq + line record[nrec].seq = cstring sys.stdout.write("DONE\n") @@ -136,7 +133,7 @@ def __init__(self): sys.stdout.write("Filtering FASTA file...") with open(args.filter, encoding="utf-8") as filter_file: - id_filter = [line.rstrip('\n') for line in filter_file] + id_filter = [line.rstrip("\n") for line in filter_file] sys.stdout.write("DONE\n") @@ -146,37 +143,40 @@ def __init__(self): if args.output: sys.stdout.write("Writing FASTA file...") - with open(args.output, 'w', encoding="utf-8") as output: - - if args.filter and args.mode == 'k': + with open(args.output, "w", encoding="utf-8") as output: + if args.filter and args.mode == "k": if args.remove: - for x in range(0, nrec+1): - if record[x].id in id_filter and\ - len(record[x].seq)-1 <= args.remove: + for x in range(0, nrec + 1): + if ( + record[x].id in id_filter + and len(record[x].seq) - 1 <= args.remove + ): output.write(f">{record[x].id}\n{record[x].seq}") else: - for x in range(0, nrec+1): + for x in range(0, nrec + 1): if record[x].id in id_filter: output.write(f">{record[x].id}\n{record[x].seq}") - elif args.filter and args.mode == 'd': + elif args.filter and args.mode == "d": if args.remove: - for x in range(0, nrec+1): - if record[x].id not in id_filter and\ - len(record[x].seq)-1 <= args.remove: + for x in range(0, nrec + 1): + if ( + record[x].id not in id_filter + and len(record[x].seq) - 1 <= args.remove + ): output.write(f">{record[x].id}\n{record[x].seq}") else: - for x in range(0, nrec+1): + for x in range(0, nrec + 1): if record[x].id not in id_filter: output.write(f">{record[x].id}\n{record[x].seq}") else: if args.remove: - for x in range(0, nrec+1): - if len(record[x].seq)-1 <= args.remove: + for x in range(0, nrec + 1): + if len(record[x].seq) - 1 <= args.remove: output.write(f">{record[x].id}\n{record[x].seq}") else: - for x in range(0, nrec+1): + for x in range(0, nrec + 1): output.write(f">{record[x].id}\n{record[x].seq}") sys.stdout.write("DONE\n") @@ -188,14 +188,15 @@ def __init__(self): if args.idlist: sys.stdout.write("Creating IDs list from FASTA file...") - with (open(args.idlist, 'w', encoding="utf-8") as id_list, - open(args.output, 'r', encoding="utf-8") as fasta): - + with ( + open(args.idlist, "w", encoding="utf-8") as id_list, + open(args.output, "r", encoding="utf-8") as fasta, + ): for line in fasta: - if line.startswith('>'): + if line.startswith(">"): idlist.append(line[1:]) idlist.sort() - id_list.write(''.join(idlist)) + id_list.write("".join(idlist)) id_list.close() sys.stdout.write("DONE\n")