Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style: reformat unit test modules #125

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

- name: flake8
working-directory: ./scripts
run: flake8 ./*.py
run: flake8

- name: mypy
working-directory: ./scripts
Expand All @@ -48,6 +48,12 @@ jobs:
working-directory: ./scripts
run: pylint --rcfile=../pylint.cfg ./*.py

- name: black
uses: psf/black@stable
with:
options: "--check --verbose --line-length=79"
src: "./scripts"


snakemake-format-graph-test:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -149,4 +155,4 @@ jobs:

- name: run unit tests
working-directory: ./scripts/tests
run: pytest --cov=scripts --cov-branch --cov-report=term-missing
run: pytest --import-mode prepend --cov=scripts --cov-branch --cov-report=term-missing
1 change: 1 addition & 0 deletions scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""MIRFLOWZ scripts."""
57 changes: 28 additions & 29 deletions scripts/filter_multimappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,30 @@ def parse_arguments():
"""Command-line arguments parser."""
parser = argparse.ArgumentParser(
description="Script to filter multimappers by indel counts."
)
)

parser.add_argument(
'-v', '--version',
action='version',
version='%(prog)s 1.0',
help="Show program's version number and exit"
)
"-v",
"--version",
action="version",
version="%(prog)s 1.0",
help="Show program's version number and exit",
)

parser.add_argument(
'infile',
"infile",
help="Path to the SAM input file, sorted by query name.",
type=Path
)
type=Path,
)

parser.add_argument(
'--nh',
"--nh",
help=(
"If set, the NH tag will be include in the alignment name after"
"and underscore. Default: %(default)s."
),
action='store_true',
default=False
action="store_true",
default=False,
)

return parser
Expand Down Expand Up @@ -101,7 +102,7 @@ def count_indels(aln: pysam.libcalignedsegment.AlignedSegment) -> int:


def find_best_alignments(
alns: List[pysam.AlignedSegment], nh: bool = False
alns: List[pysam.AlignedSegment], nh: bool = False
) -> List[pysam.AlignedSegment]:
"""Find alignments with less indels.

Expand Down Expand Up @@ -129,20 +130,18 @@ def find_best_alignments(
aln_indels = [(aln, count_indels(aln=aln)) for aln in alns]
min_indels = min(aln_indels, key=lambda x: x[1])[1]
best_alignments = [
aln
for i, (aln, indels) in enumerate(aln_indels)
if indels == min_indels]
aln
for i, (aln, indels) in enumerate(aln_indels)
if indels == min_indels
]

for i, best_aln in enumerate(best_alignments):

if nh:
name = (
f'{best_aln.query_name}_{len(best_alignments)}'
)
name = f"{best_aln.query_name}_{len(best_alignments)}"
best_aln.query_name = name

best_aln.set_tag('NH', len(best_alignments))
best_aln.set_tag('HI', i + 1)
best_aln.set_tag("NH", len(best_alignments))
best_aln.set_tag("HI", i + 1)

return best_alignments

Expand All @@ -154,20 +153,18 @@ def write_output(alns: List[pysam.AlignedSegment]) -> None:
alignments: alignments with the same query name
"""
for alignment in alns:
sys.stdout.write(alignment.to_string() + '\n')
sys.stdout.write(alignment.to_string() + "\n")


def main(arguments) -> None:
"""Filter multimappers by indels count."""
with pysam.AlignmentFile(arguments.infile, "r") as samfile:

sys.stdout.write(str(samfile.header))

current_alignments: list[pysam.AlignedSegment] = []
current_query = None

for alignment in samfile:

if alignment.is_supplementary:
continue

Expand All @@ -178,16 +175,18 @@ def main(arguments) -> None:
current_alignments.append(alignment)

else:
current_alignments = find_best_alignments(current_alignments,
arguments.nh)
current_alignments = find_best_alignments(
current_alignments, arguments.nh
)
write_output(alns=current_alignments)

current_query = alignment.query_name
current_alignments = [alignment]

if len(current_alignments) > 0:
current_alignments = find_best_alignments(current_alignments,
arguments.nh)
current_alignments = find_best_alignments(
current_alignments, arguments.nh
)
write_output(alns=current_alignments)


Expand Down
117 changes: 66 additions & 51 deletions scripts/iso_name_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,70 +92,73 @@ def parse_arguments():
"""Command-line arguments parser."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'-v', '--version',
action='version',
version='%(prog)s 1.0.0',
help="Show program's version number and exit"
"-v",
"--version",
action="version",
version="%(prog)s 1.0.0",
help="Show program's version number and exit",
)
parser.add_argument(
'-b', '--bed',
"-b",
"--bed",
help=(
"Path to the BED file. This file must be the output of "
" a bedtools intersect call with -a being a GFF3 file and"
" -b a BAM file."
),
type=Path,
required=True
required=True,
)
parser.add_argument(
'-s', '--sam',
"-s",
"--sam",
help="Path to the SAM input file.",
type=Path,
required=True
required=True,
)
parser.add_argument(
'-e', '--extension',
"-e",
"--extension",
help=(
"Number of nucleotides the start and end coordinates of the"
" annotated features had been extended. Default: %(default)d."
),
default=0,
type=int
type=int,
)
parser.add_argument(
'--id',
"--id",
help=(
"ID used to identify the feature in the name that is added as tag."
" The ID must be in lowercase. Default: %(default)s."
),
default="name",
type=str
type=str,
)

return parser


def attributes_dictionary(attr: str) -> Dict[str, str]:
"""Create attributes dicctionary."""
pairs = attr.split(';')
pairs = attr.split(";")

if len(pairs[0].split('=')) == 2:
attr_dict = {p.split('=')[0].lower(): p.split('=')[1] for p in pairs}
if len(pairs[0].split("=")) == 2:
attr_dict = {p.split("=")[0].lower(): p.split("=")[1] for p in pairs}
else:
attr_dict = {
p.split('"')[0].strip().lower(): p.split('"')[1]
for p in pairs}
p.split('"')[0].strip().lower(): p.split('"')[1] for p in pairs
}

return attr_dict


def parse_intersect_output(
intersect_file: Path,
ID: str = "name",
extension: int = 0) -> Optional[Dict[Optional[str], list]]:
intersect_file: Path, ID: str = "name", extension: int = 0
) -> Optional[Dict[Optional[str], list]]:
"""Parse intersect BED file.

Given a BED file generated by intersecting a GFF file (-a) with a BAM file
Expand All @@ -176,25 +179,39 @@ def parse_intersect_output(
adjusted. Defaults to 0.
"""
intersect_data = defaultdict(list)
Fields = namedtuple('Fields', ("feat_chr", "source", "feat_type",
"feat_start", "feat_end", "feat_score",
"strand", "phase", "feat_attributes",
"read_chr", "read_start", "read_end",
"read_name", "read_score", "read_strand",
"overlap_len"))

with open(intersect_file, 'r', encoding="utf-8") as bedfile:
for line in bedfile:
Fields = namedtuple(
"Fields",
(
"feat_chr",
"source",
"feat_type",
"feat_start",
"feat_end",
"feat_score",
"strand",
"phase",
"feat_attributes",
"read_chr",
"read_start",
"read_end",
"read_name",
"read_score",
"read_strand",
"overlap_len",
),
)

fields = Fields(*line.strip().split('\t'))
with open(intersect_file, "r", encoding="utf-8") as bedfile:
for line in bedfile:
fields = Fields(*line.strip().split("\t"))

miRNA_name = attributes_dictionary(fields.feat_attributes)[ID]
miRNA_start = int(fields.feat_start) + extension
miRNA_end = int(fields.feat_end) - extension

intersect_data[fields.read_name].append((miRNA_name,
miRNA_start,
miRNA_end))
intersect_data[fields.read_name].append(
(miRNA_name, miRNA_start, miRNA_end)
)

if not intersect_data:
return None
Expand All @@ -203,10 +220,8 @@ def parse_intersect_output(


def get_tags(
intersecting_mirna: list,
alignment: pysam.AlignedSegment,
extension: int
) -> set:
intersecting_mirna: list, alignment: pysam.AlignedSegment, extension: int
) -> set:
"""Get tag for alignment.

Given an alignment and a list containing the feature name, start position,
Expand All @@ -233,7 +248,7 @@ def get_tags(
set of strings containing the new tag
"""
cigar = alignment.cigarstring
md = alignment.get_tag('MD')
md = alignment.get_tag("MD")
limit = extension + 1

tags = []
Expand All @@ -243,19 +258,18 @@ def get_tags(
shift_3p = alignment.reference_end - miRNA_end

if -limit < shift_5p < limit and -limit < shift_3p < limit:
tags.append(f'{miRNA_name}|{shift_5p}|{shift_3p}|{cigar}|{md}')
tags.append(f"{miRNA_name}|{shift_5p}|{shift_3p}|{cigar}|{md}")

return set(tags)


def main(arguments) -> None:
"""Add intersecting feature(s) into a SAM file as a tag."""
intersect_data = parse_intersect_output(arguments.bed,
arguments.id,
arguments.extension)

with pysam.AlignmentFile(arguments.sam, 'r') as samfile:
intersect_data = parse_intersect_output(
arguments.bed, arguments.id, arguments.extension
)

with pysam.AlignmentFile(arguments.sam, "r") as samfile:
sys.stdout.write(str(samfile.header))

if intersect_data is None:
Expand All @@ -265,15 +279,16 @@ def main(arguments) -> None:
alignment_id = alignment.query_name
intersecting_miRNAs = intersect_data[alignment_id]

tags = get_tags(intersecting_mirna=intersecting_miRNAs,
alignment=alignment,
extension=arguments.extension)
tags = get_tags(
intersecting_mirna=intersecting_miRNAs,
alignment=alignment,
extension=arguments.extension,
)

alignment.set_tag('YW', ';'.join(tags))
sys.stdout.write(alignment.to_string() + '\n')
alignment.set_tag("YW", ";".join(tags))
sys.stdout.write(alignment.to_string() + "\n")


if __name__ == "__main__":

args = parse_arguments().parse_args() # pragma: no cover
main(args) # pragma: no cover
Loading