-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
186 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
__pycache__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,92 +1,168 @@ | ||
import sys | ||
import argparse | ||
from argparse import ArgumentParser | ||
from itertools import dropwhile, groupby | ||
|
||
BLANK_POSITION = "-" | ||
DEFAULT_QUALITY = 60 | ||
DEFAULT_REF_NAME = "ref" | ||
VALID_CHARACTERS = set("ACGT").union(BLANK_POSITION) | ||
SAM_ROW_DELIMITER = "\n" | ||
SAM_COL_DELIMITER = "\t" | ||
SAM_VERSION_NUMBER = "1.4" | ||
|
||
def remove_blanks(seq): | ||
return "".join(c for c in seq if c != "-") | ||
|
||
def is_valid(char): | ||
return char in VALID_CHARACTERS | ||
|
||
|
||
def present(base): | ||
return base != BLANK_POSITION | ||
|
||
|
||
def missing(base): | ||
return not present(base) | ||
|
||
|
||
def remove_all_blanks(sequence): | ||
return sequence.replace(BLANK_POSITION, "") | ||
|
||
|
||
def remove_whitespaces(lines): | ||
return ["".join(line.split()) for line in lines if line.strip()] | ||
|
||
|
||
def remove_trailing_blanks(sequence): | ||
return sequence.rstrip(BLANK_POSITION) | ||
|
||
def cigar(read, ref): | ||
def transform(read_base, ref_base): | ||
if read_base == '-' and ref_base == '-': | ||
return "" | ||
elif read_base == "-" and ref_base != "-": | ||
return "D" | ||
elif read_base != "-" and ref_base == "-": | ||
return "I" | ||
else: | ||
return "M" | ||
|
||
grouper = groupby(transform(b1, b2) for b1, b2 in zip(read, ref)) | ||
return "".join(str(len(list(v))) + k for k, v in grouper if k) | ||
def check_legal_characters(lines): | ||
for row, line in enumerate(lines, 1): | ||
for col, char in enumerate(line, 1): | ||
assert is_valid(char), \ | ||
f"Found illegal character '{char}' (line: {row}, position: {col})" | ||
|
||
|
||
def perform_checks(lines): | ||
assert len(lines) >= 2, "Provide at least one read and a reference" | ||
check_legal_characters(lines) | ||
|
||
|
||
def mark_operation(read_base, ref_base): | ||
if missing(read_base) and missing(ref_base): | ||
return None | ||
elif missing(read_base) and present(ref_base): | ||
return "D" | ||
elif present(read_base) and missing(ref_base): | ||
return "I" | ||
else: | ||
return "M" | ||
|
||
|
||
def cigar_entry(operation, iterator): | ||
return str(sum(1 for _ in iterator)) + operation | ||
|
||
|
||
def cigar(read, ref): | ||
operations = (mark_operation(*bases) for bases in zip(read, ref)) | ||
grouper = groupby(op for op in operations if op) | ||
return "".join(cigar_entry(op, values) for op, values in grouper if op) | ||
|
||
|
||
def make_sam_entry(index, read, reference): | ||
shifted_read = "".join(dropwhile(lambda c: c == "-", read)) | ||
shifted_read = "".join(dropwhile(missing, read)) | ||
read_offset = len(read) - len(shifted_read) | ||
shifted_reference = reference[read_offset:] | ||
real_read = remove_blanks(shifted_read) | ||
real_read = remove_all_blanks(shifted_read) | ||
|
||
return (index, 0, "ref", | ||
read_offset + 1, 60, | ||
read_offset + 1, DEFAULT_QUALITY, | ||
cigar(shifted_read, shifted_reference), | ||
"*", 0, 0, real_read, "@"*len(real_read)) | ||
|
||
def sam_sort_key(entry): | ||
|
||
def make_reference_file(ref_sequence, ref_file_name): | ||
with open(ref_file_name, "w") as ref_file: | ||
ref_file.write(f">{DEFAULT_REF_NAME}\n") | ||
ref_file.write(ref_sequence) | ||
ref_file.write("\n") | ||
|
||
|
||
def make_sam_header(ref_length): | ||
SAM_HEADER_R1 = SAM_COL_DELIMITER.join([ | ||
"@HD", | ||
f"VN:{SAM_VERSION_NUMBER}", | ||
"SO:coordinate" | ||
]) | ||
SAM_HEADER_R2 = SAM_COL_DELIMITER.join([ | ||
"@SQ", | ||
f"SN:{DEFAULT_REF_NAME}", | ||
f"LN:{ref_length}" | ||
]) | ||
return SAM_ROW_DELIMITER.join([SAM_HEADER_R1, SAM_HEADER_R2]) | ||
|
||
|
||
def make_alignments_file(sam_body, ref_length, sam_file_name): | ||
sam_header = make_sam_header(ref_length) | ||
|
||
with open(sam_file_name, "w") as sam_file: | ||
sam_file.write(sam_header) | ||
sam_file.write(SAM_ROW_DELIMITER) | ||
sam_file.write(sam_body) | ||
|
||
|
||
def sam_sorter(entry): | ||
return entry[2], entry[3] | ||
|
||
|
||
def format_line(idx, read, reference): | ||
shifted_read = "".join(dropwhile(lambda c: c == "-", read)) | ||
read_offset = len(read) - len(shifted_read) | ||
shifted_reference = reference[read_offset:] | ||
real_read = remove_blanks(shifted_read) | ||
def create_files(reference, reads, ref_file_name, sam_file_name): | ||
real_reference = remove_all_blanks(reference) | ||
make_reference_file(real_reference, ref_file_name) | ||
|
||
return "\t".join((f"read{idx}", "0", | ||
"ref", str(read_offset + 1), "60", | ||
cigar(shifted_read, shifted_reference), "*", "0", | ||
"0", real_read, "@"*len(real_read))) | ||
sam_entries = [make_sam_entry(idx + 1, read, reference) | ||
for idx, read in enumerate(reads)] | ||
sam_entries.sort(key=sam_sorter) | ||
|
||
sam_rows = SAM_COL_DELIMITER.join(str(entry) for entry in sam_entries) | ||
sam_body = SAM_ROW_DELIMITER.join(sam_rows) | ||
|
||
make_alignments_file(sam_body, len(real_reference), sam_file_name) | ||
|
||
|
||
def get_args(): | ||
parser = ArgumentParser(description="""Convert a symbolic alignment file | ||
into a SAM alignemnt file and a corresponding reference.""") | ||
|
||
parser.add_argument("input_file", | ||
help="path to the input file with symbolic alignments.") | ||
parser.add_argument("-r", "--reference", | ||
default="ref.fa", | ||
dest="ref_file_name", | ||
help="desired name of the generated reference sequence file.") | ||
parser.add_argument("-a", "--alignments", | ||
default="alignments.sam", | ||
dest="sam_file_name", | ||
help="desired name of the generated alignments (sam) file.") | ||
|
||
return parser.parse_args() | ||
|
||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description='Process some integers.') | ||
parser.add_argument('input_file', | ||
help='path to the input file') | ||
parser.add_argument("-r", "--reference", default="ref.fa", dest="ref_file_name", | ||
help="name of the reference sequence file") | ||
parser.add_argument("-s", "--sam", default="als.sam", dest="sam_file_name", | ||
help="name of the alignments (sam) file") | ||
|
||
args = parser.parse_args() | ||
args = get_args() | ||
input_file_name = args.input_file | ||
ref_file_name = args.ref_file_name | ||
sam_file_name = args.sam_file_name | ||
|
||
with open(input_file_name) as file: | ||
content = file.readlines() | ||
lines = file.readlines() | ||
|
||
lines = ["".join(line.split()) for line in content if line.strip()] | ||
assert len(lines) >= 2, "Provide at least one reference and a read" | ||
lines = remove_whitespaces(lines) | ||
sequence_strings = [remove_trailing_blanks(l) for l in lines] | ||
check_legal_characters(sequence_strings) | ||
|
||
valid_chars = set("ACGT-") | ||
assert all(all(c in valid_chars for c in line) | ||
for line in lines), "Invalid characters in data" | ||
reference = sequence_strings.pop() | ||
reads = sequence_strings | ||
|
||
reference = lines.pop() | ||
real_reference = remove_blanks(reference) | ||
with open(ref_file_name, "w") as ref_file: | ||
ref_file.write(">ref\n") | ||
ref_file.write(real_reference) | ||
ref_file.write("\n") | ||
|
||
SAM_HEADER = f"@HD\tVN:1.4\tSO:coordinate\n@SQ\tSN:ref\tLN:{len(real_reference)}\n" | ||
sam_entries = sorted((make_sam_entry(idx + 1, read, reference) | ||
for idx, read in enumerate(lines)), key=sam_sort_key) | ||
sam_strings = "\n".join("\t".join(map(str,entry)) for entry in sam_entries) | ||
with open(sam_file_name, "w") as sam_file: | ||
sam_file.write(SAM_HEADER) | ||
sam_file.write(sam_strings) | ||
create_files(reference, reads, ref_file_name, sam_file_name) | ||
|
||
|
||
main() | ||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
import unittest | ||
from sammock import * | ||
|
||
|
||
|
||
class TestCigarFunctions(unittest.TestCase): | ||
def test_mark_operation(self): | ||
self.assertEqual("M", mark_operation("A", "A")) | ||
self.assertEqual("M", mark_operation("A", "C")) | ||
self.assertEqual("I", mark_operation("A", "-")) | ||
self.assertEqual("D", mark_operation("-", "G")) | ||
|
||
def test_cigar_mathces(self): | ||
self.assertEqual("5M", cigar("ACGCT", "ACGGT")) | ||
self.assertEqual("3M", cigar("ACT", "ACGGT")) | ||
self.assertEqual("2M", cigar("ACTTT", "AC")) | ||
self.assertEqual("1M", cigar("A", "ACGGT")) | ||
self.assertEqual("1M", cigar("ACCTG", "C")) | ||
self.assertEqual("1M4D", cigar("A----", "ACGGT")) | ||
self.assertEqual("1M1I", cigar("AD", "A----")) | ||
|
||
def test_cigar_insertions(self): | ||
self.assertEqual("1M2D2M", cigar("A--CT", "ACGGT")) | ||
self.assertEqual("1M3D1M", cigar("A---T", "ACGGT")) | ||
self.assertEqual("2I2M", cigar("CGTT", "--TTT")) | ||
|
||
def test_cigar_deletions(self): | ||
self.assertEqual("1M2I2M", cigar("ADGCG", "A--CG")) | ||
self.assertEqual("1M3I1M", cigar("ADGCG", "A---G")) | ||
self.assertEqual("2D2M", cigar("--TC", "ACGGT")) | ||
|
||
def test_cigar_edge(self): | ||
self.assertEqual("", cigar("", "CGC--TA---AGATTGT")) | ||
self.assertEqual("", cigar("GCGGD", "")) | ||
|
||
|
||
def test_cigar_complex(self): | ||
self.assertEqual("2M2I1M2D3M", cigar("GCTGA--GTC-", "AC--AGCAGG--GC")) | ||
self.assertEqual("4D1M2I6M1D1M2D1M", cigar("------CCATA---AGTA-C--T", "--CACGC--TA---AGTATTATTGT")) | ||
reference = "A-GA-GAC-AC" | ||
for cg, read in [("1D1I2M1I3M", "-CGTACCT"), | ||
("2M1D3M", "A-G--GTA"), | ||
("1D2M1I3M", "--TGACGT"), | ||
("1M1I2M1I2D1M1I", "CGTAC--AC"), | ||
("1M1I1D1M1I3M1I1M", "AC-TCGATGA"), | ||
("1M1I1M1D2M1D2M", "ACG--GA--AC"), | ||
("1D1I4M1D2M", "-CGA-GA--AC")]: | ||
self.assertEqual(cg, cigar(read, reference)) | ||
|