Skip to content

Commit

Permalink
add check for narrow maf annotations vs full failed annotations repor…
Browse files Browse the repository at this point in the history
…t, move filepaths
  • Loading branch information
rxu17 committed Jan 12, 2024
1 parent 192076e commit 80367db
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 34 deletions.
100 changes: 78 additions & 22 deletions genie/process_mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pandas as pd
from synapseclient import Synapse

from . import load, process_functions
from . import extract, load, process_functions

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -199,19 +199,22 @@ def process_mutation_workflow(
logger.info("No mutation data")
return None
# Certificate to use GENIE Genome Nexus

syn.get(
"syn22053204",
ifcollision="overwrite.local",
downloadLocation=genie_config["genie_annotation_pkg"],
# version=1, # TODO: This should pull from a config file in the future
)

# Genome Nexus Jar file
syn.get(
"syn22084320",
ifcollision="overwrite.local",
downloadLocation=genie_config["genie_annotation_pkg"],
# version=13, # TODO: This should pull from a config file in the future
)

annotation_paths = create_annotation_paths(center=center, workdir=workdir)
annotate_mutation(
annotation_paths=annotation_paths,
Expand All @@ -227,18 +230,23 @@ def process_mutation_workflow(
syn=syn,
center=center,
maf_tableid=maf_tableid,
annotated_maf_path=annotation_paths.merged_maf_path,
annotation_paths=annotation_paths,
flatfiles_synid=flatfiles_synid,
workdir=workdir,
)

full_error_report = concat_annotation_error_reports(
center=center,
input_dir=annotation_paths.error_dir,
)

check_annotation_error_reports(
syn=syn,
maf_table_synid=maf_tableid,
full_error_report=full_error_report,
center=center,
)
store_annotation_error_reports(
full_error_report=full_error_report,
full_error_report_path=annotation_paths.full_error_report_path,
syn=syn,
errors_folder_synid=genie_config["center_config"][center]["errorsSynId"],
)
Expand All @@ -259,7 +267,15 @@ def create_annotation_paths(center: str, workdir: str) -> namedtuple:
output_files_dir = tempfile.mkdtemp(dir=workdir)
Filepaths = namedtuple(
"Filepaths",
["input_files_dir", "output_files_dir", "error_dir", "merged_maf_path"],
[
"input_files_dir",
"output_files_dir",
"error_dir",
"merged_maf_path",
"full_maf_path",
"narrow_maf_path",
"full_error_report_path",
],
)
annotation_paths = Filepaths(
input_files_dir=input_files_dir,
Expand All @@ -268,6 +284,21 @@ def create_annotation_paths(center: str, workdir: str) -> namedtuple:
merged_maf_path=os.path.join(
output_files_dir, f"data_mutations_extended_{center}.txt"
),
full_maf_path=os.path.join(
workdir, center, "staging", f"data_mutations_extended_{center}.txt"
),
narrow_maf_path=os.path.join(
workdir,
center,
"staging",
f"data_mutations_extended_{center}_MAF_narrow.txt",
),
full_error_report_path=os.path.join(
workdir,
center,
"staging",
f"failed_annotations_error_report.txt",
),
)
return annotation_paths

Expand Down Expand Up @@ -299,23 +330,52 @@ def concat_annotation_error_reports(
return full_error_report


def check_annotation_error_reports(
syn: Synapse, maf_table_synid: str, full_error_report: pd.DataFrame, center: str
) -> None:
"""A simple QC check to make sure our genome nexus error report
failed annotations matches our final processed maf table's failed
annotations
Args:
syn (Synapse): synapse client
maf_table_synid (str): synapse_id of the narrow maf table
full_error_report (pd.DataFrame): the failed annotations error report
center (str): the center this is for
"""
maf_table_df = extract.get_syntabledf(
syn=syn,
query_string=(
f"SELECT * FROM {maf_table_synid} "
f"WHERE Center = '{center}' AND "
"Annotation_Status = 'FAILED'"
),
)
assert len(maf_table_df) == len(full_error_report), (
"Genome nexus's failed annotations error report rows doesn't match"
f"maf table's failed annotations for {center}"
)


def store_annotation_error_reports(
full_error_report: pd.DataFrame,
full_error_report_path: str,
syn: Synapse,
errors_folder_synid: str,
) -> None:
"""Stores the annotation error reports to synapse
Args:
full_error_report (pd.DataFrame): full error report to store
full_error_report_path (str) where to store the flat file of the full error report
syn (synapseclient.Synapse): synapse client object
errors_folder_synid (str): synapse id of error report folder
to store reports in
"""
full_error_report.to_csv("failed_annotations_report.tsv", sep="\t", index=False)
full_error_report.to_csv(full_error_report_path, sep="\t", index=False)
load.store_file(
syn=syn,
filepath="failed_annotations_report.tsv",
filepath=full_error_report_path,
parentid=errors_folder_synid,
)

Expand Down Expand Up @@ -399,17 +459,16 @@ def split_and_store_maf(
syn: Synapse,
center: str,
maf_tableid: str,
annotated_maf_path: str,
annotation_paths: namedtuple,
flatfiles_synid: str,
workdir: str,
):
"""Separates annotated maf file into narrow and full maf and stores them
Args:
syn: Synapse connection
center: Center
maf_tableid: Mutation table synapse id
annotated_maf_path: Annotated maf
annotation_paths: filepaths in the annotation process
flatfiles_synid: GENIE flat files folder
"""
Expand All @@ -418,22 +477,19 @@ def split_and_store_maf(
for col in syn.getTableColumns(maf_tableid)
if col["name"] != "inBED"
]
full_maf_path = os.path.join(
workdir, center, "staging", f"data_mutations_extended_{center}.txt"
)
narrow_maf_path = os.path.join(
workdir, center, "staging", f"data_mutations_extended_{center}_MAF_narrow.txt"
)
maf_chunks = pd.read_csv(
annotated_maf_path, sep="\t", chunksize=100000, comment="#"
annotation_paths.merged_maf_path, sep="\t", chunksize=100000, comment="#"
)

for maf_chunk in maf_chunks:
maf_chunk = format_maf(maf_chunk, center)
append_or_createdf(maf_chunk, full_maf_path)
append_or_createdf(maf_chunk, annotation_paths.full_maf_path)
narrow_maf_chunk = maf_chunk[narrow_maf_cols]
append_or_createdf(narrow_maf_chunk, narrow_maf_path)
append_or_createdf(narrow_maf_chunk, annotation_paths.narrow_maf_path)

load.store_table(syn=syn, filepath=narrow_maf_path, tableid=maf_tableid)
load.store_table(
syn=syn, filepath=annotation_paths.narrow_maf_path, tableid=maf_tableid
)
# Store MAF flat file into synapse
load.store_file(syn=syn, filepath=full_maf_path, parentid=flatfiles_synid)
load.store_file(
syn=syn, filepath=annotation_paths.full_maf_path, parentid=flatfiles_synid
)
84 changes: 72 additions & 12 deletions tests/test_process_mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pytest
import synapseclient

from genie import load, process_mutation
from genie import extract, load, process_mutation


def test_format_maf():
Expand Down Expand Up @@ -102,13 +102,24 @@ def test_move_mutation_maf(self):
def annotation_paths():
Filepaths = namedtuple(
"Filepaths",
["input_files_dir", "output_files_dir", "error_dir", "merged_maf_path"],
[
"input_files_dir",
"output_files_dir",
"error_dir",
"merged_maf_path",
"narrow_maf_path",
"full_maf_path",
"full_error_report_path",
],
)
yield Filepaths(
input_files_dir="input/dir",
output_files_dir="input/dir",
error_dir="input/dir/SAGE_error_reports",
merged_maf_path="input/dir/data_mutations_extended_SAGE.txt",
narrow_maf_path="input/SAGE/staging/data_mutations_extended_SAGE_MAF_narrow.txt",
full_maf_path="input/SAGE/staging/data_mutations_extended_SAGE.txt",
full_error_report_path="input/SAGE/staging/failed_annotations_report.txt",
)


Expand All @@ -134,6 +145,7 @@ def test_process_mutation_workflow(syn, genie_config, annotation_paths):
]
center = "SAGE"
workdir = "working/dir/path"
maf_table_id = "syn22493903"
sample_error_report = pd.DataFrame({"col1": [1, 2, 3], "col2": [2, 3, 4]})
with patch.object(
process_mutation, "create_annotation_paths", return_value=annotation_paths
Expand All @@ -146,6 +158,8 @@ def test_process_mutation_workflow(syn, genie_config, annotation_paths):
"concat_annotation_error_reports",
return_value=sample_error_report,
) as patch_concat_error, patch.object(
process_mutation, "check_annotation_error_reports"
) as patch_check_error, patch.object(
process_mutation, "store_annotation_error_reports"
) as patch_store_error:
maf = process_mutation.process_mutation_workflow(
Expand All @@ -162,15 +176,20 @@ def test_process_mutation_workflow(syn, genie_config, annotation_paths):
patch_split.assert_called_once_with(
syn=syn,
center=center,
maf_tableid="syn22493903",
annotated_maf_path=annotation_paths.merged_maf_path,
maf_tableid=maf_table_id,
annotated_maf_path=annotation_paths,
flatfiles_synid="syn12279903",
workdir=workdir,
)
patch_concat_error.assert_called_once_with(
center=center,
input_dir=annotation_paths.error_dir,
)
patch_check_error.assert_called_once_with(
syn=syn,
maf_table_synid=maf_table_id,
full_error_report=sample_error_report,
center=center,
)
patch_store_error.assert_called_once_with(
full_error_report=sample_error_report,
syn=syn,
Expand Down Expand Up @@ -224,6 +243,46 @@ def test_error_report(self):
}
)

def test_check_annotation_error_reports_passes_if_match(
self, syn, test_error_report
):
maf_table_synid = "synZZZZ"
with patch.object(
extract, "get_syntabledf", return_value=test_error_report
) as patch_get_syntabledf:
process_mutation.check_annotation_error_reports(
syn=syn,
maf_table_synid="synZZZZ",
full_error_report=test_error_report,
center="SAGE",
)
patch_get_syntabledf.assert_called_once_with(
syn=syn,
query_string=f"SELECT * FROM {maf_table_synid} "
"WHERE Center = 'SAGE' AND "
"Annotation_Status = 'FAILED'",
)

def test_check_annotation_error_reports_throws_assertion_if_no_match(
self, syn, test_error_report
):
with patch.object(
extract,
"get_syntabledf",
return_value=pd.DataFrame({"test": [1], "test2": [2]}),
):
with pytest.raises(
AssertionError,
match="Genome nexus's failed annotations error report rows doesn't match"
"maf table's failed annotations for SAGE",
):
process_mutation.check_annotation_error_reports(
syn=syn,
maf_table_synid="synZZZZ",
full_error_report=test_error_report,
center="SAGE",
)

def test_concat_annotation_error_reports_returns_expected(self, test_error_report):
compiled_report = process_mutation.concat_annotation_error_reports(
input_dir="source_test_directory",
Expand All @@ -236,15 +295,17 @@ def test_concat_annotation_error_reports_returns_expected(self, test_error_repor

def test_store_annotation_error_reports(self, syn, test_error_report):
errors_folder_synid = "syn11111"
full_error_report_path = "test.tsv"
with patch.object(load, "store_file", return_value=None) as patch_store:
process_mutation.store_annotation_error_reports(
full_error_report=test_error_report,
full_error_report_path=full_error_report_path,
syn=syn,
errors_folder_synid=errors_folder_synid,
)
patch_store.assert_called_once_with(
syn=syn,
filepath="failed_annotations_report.tsv",
filepath=full_error_report_path,
parentid=errors_folder_synid,
)

Expand Down Expand Up @@ -315,7 +376,7 @@ def test_append_or_createdf_create_file_0size():
patch_tocsv.assert_called_once_with(temp_file.name, sep="\t", index=False)


def test_split_and_store_maf(syn):
def test_split_and_store_maf(syn, annotation_paths):
"""Integration test, check splitting and storing of maf functions are
called"""
# getTableColumns
Expand Down Expand Up @@ -352,21 +413,20 @@ def test_split_and_store_maf(syn):
syn=syn,
center=center,
maf_tableid="sy12345",
annotated_maf_path=annotated_maf_path,
annotation_paths=annotation_paths,
flatfiles_synid="syn2345",
workdir="workdir/path",
)
patch_getcols.assert_called_once_with("sy12345")
patch_readcsv.assert_called_once_with(
annotated_maf_path, sep="\t", chunksize=100000, comment="#"
annotation_paths.merged_maf_path, sep="\t", chunksize=100000, comment="#"
)
patch_format.assert_called_once_with(exampledf, center)

assert patch_append.call_count == 2

patch_store_table.assert_called_once_with(
syn=syn, filepath=narrow_maf_path, tableid="sy12345"
syn=syn, filepath=annotation_paths.narrow_maf_path, tableid="sy12345"
)
patch_store_file.assert_called_once_with(
syn=syn, filepath=full_maf_path, parentid="syn2345"
syn=syn, filepath=annotation_paths.full_maf_path, parentid="syn2345"
)

0 comments on commit 80367db

Please sign in to comment.