Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style: reformat unit test modules #125

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

- name: flake8
working-directory: ./scripts
run: flake8 ./*.py
run: flake8 --ignore E402,W504
deliaBlue marked this conversation as resolved.
Show resolved Hide resolved

- name: mypy
working-directory: ./scripts
Expand Down
17 changes: 8 additions & 9 deletions scripts/filter_multimappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ def parse_arguments():
"""Command-line arguments parser."""
parser = argparse.ArgumentParser(
description="Script to filter multimappers by indel counts."
)
)

parser.add_argument(
'-v', '--version',
action='version',
version='%(prog)s 1.0',
help="Show program's version number and exit"
)
)

parser.add_argument(
'infile',
help="Path to the SAM input file, sorted by query name.",
type=Path
)
)

parser.add_argument(
'--nh',
Expand Down Expand Up @@ -128,17 +128,16 @@ def find_best_alignments(

aln_indels = [(aln, count_indels(aln=aln)) for aln in alns]
min_indels = min(aln_indels, key=lambda x: x[1])[1]
best_alignments = [
aln
for i, (aln, indels) in enumerate(aln_indels)
if indels == min_indels]
best_alignments = [aln
for i, (aln, indels) in enumerate(aln_indels)
if indels == min_indels]
deliaBlue marked this conversation as resolved.
Show resolved Hide resolved

for i, best_aln in enumerate(best_alignments):

if nh:
name = (
f'{best_aln.query_name}_{len(best_alignments)}'
)
f'{best_aln.query_name}_{len(best_alignments)}'
)
best_aln.query_name = name

best_aln.set_tag('NH', len(best_alignments))
Expand Down
7 changes: 3 additions & 4 deletions scripts/iso_name_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ def attributes_dictionary(attr: str) -> Dict[str, str]:
if len(pairs[0].split('=')) == 2:
attr_dict = {p.split('=')[0].lower(): p.split('=')[1] for p in pairs}
else:
attr_dict = {
p.split('"')[0].strip().lower(): p.split('"')[1]
for p in pairs}
attr_dict = {p.split('"')[0].strip().lower(): p.split('"')[1]
for p in pairs}

return attr_dict

Expand Down Expand Up @@ -206,7 +205,7 @@ def get_tags(
intersecting_mirna: list,
alignment: pysam.AlignedSegment,
extension: int
) -> set:
) -> set:
"""Get tag for alignment.

Given an alignment and a list containing the feature name, start position,
Expand Down
17 changes: 9 additions & 8 deletions scripts/mirna_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ def extend_mirnas(
if seq_lengths is None:
seq_lengths = {}
for seqid in self.db.seqids():
seq_lengths[seqid] = max(
rec.end
for rec in self.db.region(seqid))
seq_lengths[seqid] = max(rec.end
for rec in self.db.region(seqid))

with (open(primir_out, 'w', encoding="utf-8") as primir,
open(mir_out, 'w', encoding="utf-8") as mirna):
Expand Down Expand Up @@ -145,7 +144,7 @@ def parse_arguments():
"""Command-line arguments parser."""
parser = argparse.ArgumentParser(
description="Script to extend miRNAs start and end coordinates."
)
)
parser.add_argument(
'-v', '--version',
action='version',
Expand Down Expand Up @@ -188,10 +187,12 @@ def main(arguments) -> None:
outdir = Path(arguments.outdir)
outdir.mkdir(parents=True, exist_ok=True)

primir_out = outdir/(
f"extended_primir_annotation_{arguments.extension}_nt.gff3"
)
mir_out = outdir/f"extended_mirna_annotation_{arguments.extension}_nt.gff3"
primir_out = outdir / (
f"extended_primir_annotation_{arguments.extension}_nt.gff3"
)
mir_out = outdir / (
f"extended_mirna_annotation_{arguments.extension}_nt.gff3"
)

with open(arguments.input, encoding="utf-8") as in_file:
if len(in_file.read()) == 0:
Expand Down
28 changes: 15 additions & 13 deletions scripts/mirna_quantification.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,18 @@ def collapsed_contribution(aln: pysam.AlignedSegment) -> float:
collapsed = float(coll.group())

except AttributeError:
sys.stdout.write(f"Invalid query name: '{aln.query_name}'.\n" +
"Option --collapsed specified but query name does" +
" not include the number of collapsed sequences.\n" +
"Check SAM file consistency and CLI options" +
" --collapsed and --nh.\n")
sys.stdout.write(
f"Invalid query name: '{aln.query_name}'.\n" +
"Option --collapsed specified but query name does " +
"not include the number of collapsed sequences.\n" +
"Check SAM file consistency and CLI options" +
" --collapsed and --nh.\n"
)
deliaBlue marked this conversation as resolved.
Show resolved Hide resolved
raise

try:
nh_value = float(aln.get_tag("NH"))
return collapsed/nh_value
return collapsed / nh_value

except KeyError:
return collapsed
Expand All @@ -320,7 +322,7 @@ def nh_contribution(aln: pysam.AlignedSegment) -> float:
if (cont := re.search(r'\d+$', name)):
nh_val = float(cont.group())

return 1/nh_val
return 1 / nh_val

except AttributeError:
sys.stdout.write(f"Invalid query name: '{aln.query_name}'.\n" +
Expand All @@ -346,7 +348,7 @@ def contribution(aln: pysam.AlignedSegment) -> float:
the conrtibution of the alignment to the overall count
"""
try:
return 1/float(aln.get_tag("NH"))
return 1 / float(aln.get_tag("NH"))

except KeyError:
return 1.0
Expand Down Expand Up @@ -394,13 +396,13 @@ def main(arguments) -> None:
"""Quantify miRNAs and corresponding isomiRs."""
outdir = Path(arguments.outdir)
outdir.mkdir(parents=True, exist_ok=True)
outfile = outdir/f'mirna_counts_{arguments.lib}'
outfile = outdir / f'mirna_counts_{arguments.lib}'

contribution_type = {
(True, True): collapsed_nh_contribution,
(True, False): collapsed_contribution,
(False, True): nh_contribution,
(False, False): contribution}
(True, True): collapsed_nh_contribution,
(True, False): collapsed_contribution,
(False, True): nh_contribution,
(False, False): contribution}

get_contribution = contribution_type[arguments.collapsed, arguments.nh]

Expand Down
4 changes: 2 additions & 2 deletions scripts/nh_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
def main():
"""Filter alignments by NH tag."""
sys.stdout.write(
f"Removing reads aligned more than {sys.argv[2]} times... \n"
)
f"Removing reads aligned more than {sys.argv[2]} times... \n"
)

infile = pysam.Samfile(sys.argv[1], "r", check_sq=False)
out = pysam.Samfile(sys.argv[3], "w", template=infile)
Expand Down
4 changes: 2 additions & 2 deletions scripts/oligomap_output_to_sam_nh_filtered.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ def eval_aln(nhfilter: int, d: Dict[str, list], min_err_nh: Dict[str, list],
errors = fields.edit_dist[-1]

if len(d) == 0:
if (seq_name not in list(min_err_nh.keys()) or
errors < min_err_nh[seq_name][0]):
if (seq_name not in list(
min_err_nh.keys()) or errors < min_err_nh[seq_name][0]):

min_err_nh[seq_name] = [errors, 1]
d[seq_name] = [fields]
Expand Down
10 changes: 5 additions & 5 deletions scripts/primir_quantification.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def parse_arguments():
"""Command-line arguments parser."""
parser = argparse.ArgumentParser(
description=__doc__
)
)
parser.add_argument(
'-v', '--version',
action='version',
Expand Down Expand Up @@ -111,9 +111,9 @@ def attributes_dictionary(attr: str) -> Dict[str, str]:
attr_dict = {p.split('=')[0].lower(): p.split('=')[1] for p in pairs}
else:
attr_dict = {
p.split('"')[0].strip().lower(): p.split('"')[1]
for p in pairs
}
p.split('"')[0].strip().lower(): p.split('"')[1]
for p in pairs
}

return attr_dict

Expand All @@ -138,7 +138,7 @@ def get_contribution(query_id: str,
num_reads = 1
nh_value = 1

return num_reads/nh_value
return num_reads / nh_value


def get_initial_data(name: str, feat_extension: bool) -> list[str]:
Expand Down
36 changes: 20 additions & 16 deletions scripts/tests/test_filter_multimappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
def sam_empty_file():
"""Import path to empty test file."""
empty_file = Path("files/header_only.sam")

return empty_file


Expand All @@ -42,6 +42,7 @@ def sam_no_multimappers_file():

return no_multi


@pytest.fixture
def sam_unique_diff_multimappers_files():
"""Import path to test files with a single multimapper."""
Expand All @@ -50,6 +51,7 @@ def sam_unique_diff_multimappers_files():

return in_diff_multi, out_diff_multi


@pytest.fixture
def sam_unique_equal_multimapper_files():
"""Import path to the test file with a single multimapper."""
Expand All @@ -58,21 +60,19 @@ def sam_unique_equal_multimapper_files():

return in_sam, out_sam


@pytest.fixture
def sam_sec_sup_files():
"""
Import path to the test files with secondary and supplementary alignments.
"""
"""Import path to the test files with secondary and supp. alignments."""
in_sam = Path("files/in_sec_sup.sam")
out_sam = Path("files/sec_sup.sam")

return in_sam, out_sam


@pytest.fixture
def sam_multimappers_nh_files():
"""
Import path to test files with multimappers and the NH tag in the query name.
"""
"""Import path to test files with multimappers and NH tag in the name."""
in_multimappers = Path("files/in_multimappers.sam")
out_multimappers = Path("files/multimappers_nh.sam")

Expand Down Expand Up @@ -163,7 +163,7 @@ def test_correct_input(self, monkeypatch, sam_no_multimappers_file):
)
args = parse_arguments().parse_args()
assert isinstance(args, argparse.Namespace)

def test_all_input_options(self, monkeypatch, sam_no_multimappers_file):
"""Call with a single input file and the --nh option."""
sam_1 = sam_no_multimappers_file
Expand Down Expand Up @@ -236,7 +236,7 @@ def test_find_best_alignments_equal_multimappers(self, alns):
assert output[1].get_tag("NH") == 2
assert output[0].get_tag("HI") == 1
assert output[1].get_tag("HI") == 2

def test_find_best_alignments_multimappers_nh(self, alns):
"""Test function with multimappers with different indel count."""
output = find_best_alignments([alns[0], alns[1]], True)
Expand Down Expand Up @@ -274,13 +274,13 @@ def test_write_output_one_alignment(self, capsys, sam_multimappers_files):

with pysam.AlignmentFile(in_sam, 'r') as in_file:
alignment = next(in_file)

write_output([alignment])
captured = capsys.readouterr()

with pysam.AlignmentFile(out_sam, 'r') as out_file:
out_alignment = next(out_file)

assert captured.out == out_alignment.to_string() + '\n'


Expand All @@ -304,7 +304,8 @@ def test_main_empty_file(self, capsys, monkeypatch, sam_empty_file):
with open(empty_file, 'r') as out_file:
assert captured.out == out_file.read()

def test_main_multimappers(self, capsys, monkeypatch, sam_multimappers_files):
def test_main_multimappers(self, capsys, monkeypatch,
sam_multimappers_files):
"""Test main function with multimappers."""
in_sam, out_sam = sam_multimappers_files

Expand All @@ -321,7 +322,8 @@ def test_main_multimappers(self, capsys, monkeypatch, sam_multimappers_files):
with open(out_sam, 'r') as out_file:
assert captured.out == out_file.read()

def test_main_multimappers_nh(self, capsys, monkeypatch, sam_multimappers_nh_files):
def test_main_multimappers_nh(self, capsys, monkeypatch,
sam_multimappers_nh_files):
"""Test main function with multimappers with nh argument."""
in_sam, out_sam = sam_multimappers_nh_files

Expand All @@ -339,7 +341,8 @@ def test_main_multimappers_nh(self, capsys, monkeypatch, sam_multimappers_nh_fil
with open(out_sam, 'r') as out_file:
assert captured.out == out_file.read()

def test_main_no_multimappers(self, capsys, monkeypatch, sam_no_multimappers_file):
def test_main_no_multimappers(self, capsys, monkeypatch,
sam_no_multimappers_file):
"""Test main function with no multimappers."""
sam_file = sam_no_multimappers_file

Expand All @@ -355,8 +358,9 @@ def test_main_no_multimappers(self, capsys, monkeypatch, sam_no_multimappers_fil

with open(sam_file, 'r') as out_file:
assert captured.out == out_file.read()

def test_main_secondary_supplementary(self, capsys, monkeypatch, sam_sec_sup_files):

def test_main_secondary_supplementary(self, capsys, monkeypatch,
sam_sec_sup_files):
"""Test main function with secondary and supplementary alignments."""
in_sam, out_sam = sam_sec_sup_files

Expand Down
Loading