Skip to content

Commit

Permalink
initial commit to concat and store failed annotations report
Browse files Browse the repository at this point in the history
  • Loading branch information
rxu17 committed Jan 9, 2024
1 parent 2267c4a commit 9411288
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 47 deletions.
126 changes: 105 additions & 21 deletions genie/process_mutation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Process mutation files
TODO deprecate this module and spread functions around"""
from collections import namedtuple
import logging
import os
import shutil
Expand Down Expand Up @@ -211,12 +212,12 @@ def process_mutation_workflow(
downloadLocation=genie_config["genie_annotation_pkg"],
version=13, # TODO: This should pull from a config file in the future
)

annotated_maf_path = annotate_mutation(
annotation_paths = create_annotation_paths(center=center, workdir=workdir)
annotate_mutation(
annotation_paths=annotation_paths,
center=center,
mutation_files=valid_mutation_files,
genie_annotation_pkg=genie_config["genie_annotation_pkg"],
workdir=workdir,
)

maf_tableid = genie_config["vcf2maf"]
Expand All @@ -226,18 +227,110 @@ def process_mutation_workflow(
syn=syn,
center=center,
maf_tableid=maf_tableid,
annotated_maf_path=annotated_maf_path,
annotated_maf_path=annotation_paths.merged_maf_path,
flatfiles_synid=flatfiles_synid,
workdir=workdir,
)

return annotated_maf_path
full_error_report = concat_annotation_error_reports(
center=center,
input_dir=annotation_paths.error_dir,
)

store_annotation_error_reports(
full_error_report=full_error_report,
syn=syn,
errors_folder_synid=genie_config["center_config"][center]["errorsSynId"],
release="test",
)
return annotation_paths.merged_maf_path


def create_annotation_paths(center: str, workdir: str) -> namedtuple:
"""Creates the filepaths required in the annotation process
Args:
center (str): name of the center
workdir (str): work directory to create paths in
Returns:
namedtuple: tuple with all the paths
"""
input_files_dir = tempfile.mkdtemp(dir=workdir)
output_files_dir = tempfile.mkdtemp(dir=workdir)
Filepaths = namedtuple(
"Filepaths",
["input_files_dir", "output_files_dir", "error_dir", "merged_maf_path"],
)
annotation_paths = Filepaths(
input_files_dir=input_files_dir,
output_files_dir=output_files_dir,
error_dir=os.path.join(output_files_dir, f"{center}_error_reports"),
merged_maf_path=os.path.join(
output_files_dir, f"data_mutations_extended_{center}.txt"
),
)
return annotation_paths


def concat_annotation_error_reports(
center: str,
input_dir: str,
) -> pd.DataFrame:
"""Concatenates the annotation error reports
Args:
center (str): name of center associated with error report
input_dir (str): directory where error reports are
Returns:
pd.DataFrame: full annotation error report
"""
error_files = os.listdir(input_dir)
chunk_size = 10000
error_reports = []

# Read and concatenate TSV files in chunks
for file in error_files:
for chunk in pd.read_csv(
os.path.join(input_dir, file), sep="\t", chunksize=chunk_size
):
error_reports.append(chunk)
full_error_report = pd.concat(error_reports)
full_error_report["Center"] = center
return full_error_report


def store_annotation_error_reports(
full_error_report: pd.DataFrame,
syn: Synapse,
errors_folder_synid: str,
release: str,
) -> None:
"""Stores the annotation error reports to synapse
Args:
full_error_report (pd.DataFrame): full error report to store
syn (synapseclient.Synapse): synapse client object
errors_folder_synid (str): synapse id of error report folder
to store reports in
release (str): name of the release for this processing run
"""
full_error_report.to_csv("failed_annotations_report.tsv", sep="\t", index=False)
load.store_file(
syn=syn,
filepath="failed_annotations_report.tsv",
parentid=errors_folder_synid,
version_comment=release,
)


# TODO: add to transform
def annotate_mutation(
center: str, mutation_files: list, genie_annotation_pkg: str, workdir: str
) -> str:
annotation_paths: namedtuple,
mutation_files: list,
genie_annotation_pkg: str,
center: str,
) -> None:
"""Process vcf/maf files
Args:
Expand All @@ -248,32 +341,23 @@ def annotate_mutation(
Returns:
Path to final maf
"""
input_files_dir = tempfile.mkdtemp(dir=workdir)
output_files_dir = tempfile.mkdtemp(dir=workdir)
error_dir = os.path.join(output_files_dir, f"{center}_error_reports")

for mutation_file in mutation_files:
move_mutation(mutation_file, input_files_dir)
move_mutation(mutation_file, annotation_paths.input_files_dir)

merged_maf_path = os.path.join(
output_files_dir, f"data_mutations_extended_{center}.txt"
)
annotater_cmd = [
"bash",
os.path.join(genie_annotation_pkg, "annotation_suite_wrapper.sh"),
f"-i={input_files_dir}",
f"-o={output_files_dir}",
f"-e={error_dir}",
f"-m={merged_maf_path}",
f"-i={annotation_paths.input_files_dir}",
f"-o={annotation_paths.output_files_dir}",
f"-e={annotation_paths.error_dir}",
f"-m={annotation_paths.merged_maf_path}",
f"-c={center}",
"-s=WXS",
f"-p={genie_annotation_pkg}",
]

subprocess.check_call(annotater_cmd)

return merged_maf_path


# TODO: add to transform
def append_or_createdf(dataframe: pd.DataFrame, filepath: str):
Expand Down
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,23 @@ def genie_config():
"center": "SAGE",
"inputSynId": "syn11601335",
"stagingSynId": "syn11601337",
"errorsSynId": "syn53239079",
"release": True,
"mutationInCisFilter": "ON",
},
"TEST": {
"center": "TEST",
"inputSynId": "syn11601340",
"stagingSynId": "syn11601342",
"errorsSynId": "syn53239081",
"release": True,
"mutationInCisFilter": "ON",
},
"GOLD": {
"center": "GOLD",
"inputSynId": "syn52950860",
"stagingSynId": "syn52950861",
"errorsSynId": "syn53239077",
"release": True,
"mutationInCisFilter": "ON",
},
Expand Down
Loading

0 comments on commit 9411288

Please sign in to comment.