From 1bf90ffe9225504d8017ba58325d052dc5e873a3 Mon Sep 17 00:00:00 2001 From: Kevin Schaper Date: Mon, 8 Jul 2024 18:29:19 -0700 Subject: [PATCH 1/5] Starting to mess around with configuring multiple writers --- src/koza/io/writer/tsv_writer.py | 4 ++++ src/koza/model/config/source_config.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/src/koza/io/writer/tsv_writer.py b/src/koza/io/writer/tsv_writer.py index 5c586bd..bc95334 100644 --- a/src/koza/io/writer/tsv_writer.py +++ b/src/koza/io/writer/tsv_writer.py @@ -27,6 +27,10 @@ def __init__( self.list_delimiter = "|" self.converter = KGXConverter() self.sssom_config = sssom_config + self.split_nodes_on = ["category", "in_taxon_label"] + + self.split_node_fh_map = {} + self.split_edge_fh_map = {} Path(self.dirname).mkdir(parents=True, exist_ok=True) diff --git a/src/koza/model/config/source_config.py b/src/koza/model/config/source_config.py index 8ef42fe..57a41ad 100644 --- a/src/koza/model/config/source_config.py +++ b/src/koza/model/config/source_config.py @@ -180,6 +180,8 @@ class SourceConfig: transform_mode: TransformMode = TransformMode.flat global_table: Optional[Union[str, Dict]] = None local_table: Optional[Union[str, Dict]] = None + split_node_output_on: Optional[List[str]] = None + split_edge_output_on: Optional[List[str]] = None def extract_archive(self): archive_path = Path(self.file_archive).parent # .absolute() From 23500baccbea0145a9268f9a601933fc3f278cc2 Mon Sep 17 00:00:00 2001 From: Harshad Hegde Date: Mon, 15 Jul 2024 17:42:07 -0500 Subject: [PATCH 2/5] undo previous changes --- src/koza/io/writer/tsv_writer.py | 55 +++++++++++++++++--------- src/koza/model/config/source_config.py | 2 - 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/src/koza/io/writer/tsv_writer.py b/src/koza/io/writer/tsv_writer.py index bc95334..fcecdc4 100644 --- a/src/koza/io/writer/tsv_writer.py +++ b/src/koza/io/writer/tsv_writer.py @@ -27,10 +27,6 @@ def __init__( self.list_delimiter = "|" self.converter = KGXConverter() self.sssom_config = sssom_config - self.split_nodes_on = ["category", "in_taxon_label"] - - self.split_node_fh_map = {} - self.split_edge_fh_map = {} Path(self.dirname).mkdir(parents=True, exist_ok=True) @@ -48,40 +44,63 @@ def __init__( self.edgeFH = open(self.edges_file_name, "w") self.edgeFH.write(self.delimiter.join(self.edge_columns) + "\n") - def write(self, entities: Iterable) -> None: + def write(self, entities: Iterable, split: bool = False) -> None: """Write an entities object to separate node and edge .tsv files""" nodes, edges = self.converter.convert(entities) if nodes: for node in nodes: - self.write_row(node, record_type="node") + self.write_row(node, record_type="node", split=split) if edges: for edge in edges: if self.sssom_config: edge = self.sssom_config.apply_mapping(edge) - self.write_row(edge, record_type="edge") - - def write_row(self, record: Dict, record_type: Literal["node", "edge"]) -> None: + self.write_row(edge, record_type="edge", split=split) + + def write_row(self, record: Dict, record_type: Literal["node", "edge"], split: bool = False) -> None: """Write a row to the underlying store. - + Args: record: Dict - A node or edge record record_type: Literal["node", "edge"] - The record_type of record """ + def get_new_fh_path(base_dir, filename, category): + new_dir = base_dir / "splits" + new_dir.mkdir(parents=True, exist_ok=True) + return new_dir / filename.replace(record_type + "s", f"{category}_{record_type}s") + fh = self.nodeFH if record_type == "node" else self.edgeFH columns = self.node_columns if record_type == "node" else self.edge_columns - row = build_export_row(record, list_delimiter=self.list_delimiter) - values = [] - if record_type == "node": - row["id"] = record["id"] - for c in columns: - if c in row: - values.append(str(row[c])) + + if split: + base_dir, filename = Path(fh.name).parent, getattr(self, f"{record_type}s_file_name").name + if record_type == "node": + category = record.get("category", ["UnknownNodeCategory"])[0].split(":")[-1] else: - values.append("") + subject_category = record.get("subject_category", "UnknownSubjectCategory").split(":")[-1] if record.get("subject_category") else "UnknownSubjectCategory" + + object_category = record.get("object_category", "UnknownObjectCategory").split(":")[-1] if record.get("object_category") else "UnknownObjectCategory" + + edge_category = record.get("category", ["UnknownEdgeCategory"])[0].split(":")[-1] if record.get("category") else "UnknownEdgeCategory" + + category = subject_category + edge_category + object_category + + new_fh_path = get_new_fh_path(base_dir, filename, category) + + with open(new_fh_path, "a+") as new_fh: + if new_fh.tell() == 0: # Check if file is empty + new_fh.write(self.delimiter.join(columns) + "\n") + self._write_record_to_file(new_fh, record, columns) + + self._write_record_to_file(fh, record, columns) + + def _write_record_to_file(self, fh, record, columns): + row = build_export_row(record, list_delimiter=self.list_delimiter) + values = [str(row.get(c, "")) for c in columns] fh.write(self.delimiter.join(values) + "\n") + def finalize(self): """Close file handles.""" diff --git a/src/koza/model/config/source_config.py b/src/koza/model/config/source_config.py index 57a41ad..8ef42fe 100644 --- a/src/koza/model/config/source_config.py +++ b/src/koza/model/config/source_config.py @@ -180,8 +180,6 @@ class SourceConfig: transform_mode: TransformMode = TransformMode.flat global_table: Optional[Union[str, Dict]] = None local_table: Optional[Union[str, Dict]] = None - split_node_output_on: Optional[List[str]] = None - split_edge_output_on: Optional[List[str]] = None def extract_archive(self): archive_path = Path(self.file_archive).parent # .absolute() From 11332019a3fd09abf905434516958b0cfb136509 Mon Sep 17 00:00:00 2001 From: Harshad Hegde Date: Mon, 15 Jul 2024 18:00:58 -0500 Subject: [PATCH 3/5] First stab at splitting --- src/koza/app.py | 4 +- src/koza/io/writer/jsonl_writer.py | 4 +- src/koza/io/writer/tsv_writer.py | 40 +++++++++------ tests/unit/test_tsvwriter_node_and_edge.py | 58 ++++++++++++++++++++++ 4 files changed, 88 insertions(+), 18 deletions(-) diff --git a/src/koza/app.py b/src/koza/app.py index b5236a4..d7198d6 100644 --- a/src/koza/app.py +++ b/src/koza/app.py @@ -148,7 +148,7 @@ def next_row(): """ raise NextRowException - def write(self, *entities): + def write(self, *entities, split: bool = False): # If a schema/validator is defined, validate before writing # if self.validate: if hasattr(self, 'schema'): @@ -168,7 +168,7 @@ def write(self, *entities): for edge in edges: validate(instance=edge, target_class=self.edge_type, schema=self.schema, strict=True) - self.writer.write(entities) + self.writer.write(entities, split=split) def _get_writer(self) -> Union[TSVWriter, JSONLWriter]: writer_params = [ diff --git a/src/koza/io/writer/jsonl_writer.py b/src/koza/io/writer/jsonl_writer.py index 2987879..dfaf4db 100644 --- a/src/koza/io/writer/jsonl_writer.py +++ b/src/koza/io/writer/jsonl_writer.py @@ -28,9 +28,9 @@ def __init__( if edge_properties: self.edgeFH = open(f"{output_dir}/{source_name}_edges.jsonl", "w") - def write(self, entities: Iterable): + def write(self, entities: Iterable, split: bool = False) -> None: (nodes, edges) = self.converter.convert(entities) - + # TODO: implement split if nodes: for n in nodes: node = json.dumps(n, ensure_ascii=False) diff --git a/src/koza/io/writer/tsv_writer.py b/src/koza/io/writer/tsv_writer.py index fcecdc4..690826d 100644 --- a/src/koza/io/writer/tsv_writer.py +++ b/src/koza/io/writer/tsv_writer.py @@ -58,49 +58,61 @@ def write(self, entities: Iterable, split: bool = False) -> None: if self.sssom_config: edge = self.sssom_config.apply_mapping(edge) self.write_row(edge, record_type="edge", split=split) - + def write_row(self, record: Dict, record_type: Literal["node", "edge"], split: bool = False) -> None: """Write a row to the underlying store. - + Args: record: Dict - A node or edge record record_type: Literal["node", "edge"] - The record_type of record """ + def get_new_fh_path(base_dir, filename, category): new_dir = base_dir / "splits" new_dir.mkdir(parents=True, exist_ok=True) return new_dir / filename.replace(record_type + "s", f"{category}_{record_type}s") - + fh = self.nodeFH if record_type == "node" else self.edgeFH columns = self.node_columns if record_type == "node" else self.edge_columns - + if split: base_dir, filename = Path(fh.name).parent, getattr(self, f"{record_type}s_file_name").name if record_type == "node": category = record.get("category", ["UnknownNodeCategory"])[0].split(":")[-1] else: - subject_category = record.get("subject_category", "UnknownSubjectCategory").split(":")[-1] if record.get("subject_category") else "UnknownSubjectCategory" - - object_category = record.get("object_category", "UnknownObjectCategory").split(":")[-1] if record.get("object_category") else "UnknownObjectCategory" - - edge_category = record.get("category", ["UnknownEdgeCategory"])[0].split(":")[-1] if record.get("category") else "UnknownEdgeCategory" - + subject_category = ( + record.get("subject_category", "UnknownSubjectCategory").split(":")[-1] + if record.get("subject_category") + else "UnknownSubjectCategory" + ) + + object_category = ( + record.get("object_category", "UnknownObjectCategory").split(":")[-1] + if record.get("object_category") + else "UnknownObjectCategory" + ) + + edge_category = ( + record.get("category", ["UnknownEdgeCategory"])[0].split(":")[-1] + if record.get("category") + else "UnknownEdgeCategory" + ) + category = subject_category + edge_category + object_category - + new_fh_path = get_new_fh_path(base_dir, filename, category) - + with open(new_fh_path, "a+") as new_fh: if new_fh.tell() == 0: # Check if file is empty new_fh.write(self.delimiter.join(columns) + "\n") self._write_record_to_file(new_fh, record, columns) self._write_record_to_file(fh, record, columns) - + def _write_record_to_file(self, fh, record, columns): row = build_export_row(record, list_delimiter=self.list_delimiter) values = [str(row.get(c, "")) for c in columns] fh.write(self.delimiter.join(values) + "\n") - def finalize(self): """Close file handles.""" diff --git a/tests/unit/test_tsvwriter_node_and_edge.py b/tests/unit/test_tsvwriter_node_and_edge.py index b0ac031..c8d7d7f 100644 --- a/tests/unit/test_tsvwriter_node_and_edge.py +++ b/tests/unit/test_tsvwriter_node_and_edge.py @@ -34,3 +34,61 @@ def test_tsv_writer(): assert os.path.exists("{}/{}_nodes.tsv".format(outdir, outfile)) and os.path.exists( "{}/{}_edges.tsv".format(outdir, outfile) ) + + +def test_tsv_writer_split(): + """ + Writes a test tsv file + """ + g1 = Gene(id="HGNC:11603", name="TBX4") + d1 = Disease(id="MONDO:0005002", name="chronic obstructive pulmonary disease") + a1 = GeneToDiseaseAssociation( + id="uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1e", + subject=g1.id, + object=d1.id, + predicate="biolink:contributes_to", + knowledge_level="not_provided", + agent_type="not_provided", + ) + g2 = Gene(id="HGNC:11604", name="TBX5") + d2 = Disease(id="MONDO:0005003", name="asthma") + a2 = GeneToDiseaseAssociation( + id="uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1f", + subject=g2.id, + object=d2.id, + predicate="biolink:contributes_to", + knowledge_level="not_provided", + agent_type="not_provided", + ) + g3 = Gene(id="HGNC:11605", name="TBX6") + d3 = Disease(id="MONDO:0005004", name="lung cancer") + a3 = GeneToDiseaseAssociation( + id="uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1g", + subject=g3.id, + object=d3.id, + predicate="biolink:contributes_to", + knowledge_level="not_provided", + agent_type="not_provided", + ) + ents = [[g1, d1, a1], [g2, d2, a2], [g3, d3, a3]] + + node_properties = ["id", "category", "symbol", "in_taxon", "provided_by", "source"] + edge_properties = ["id", "subject", "predicate", "object", "category" "qualifiers", "publications", "provided_by"] + + outdir = "output/tests/split-examples" + outfile = "tsvwriter" + split_edge_file_substring = "UnknownSubjectCategoryGeneToDiseaseAssociationUnknownObjectCategory" + + t = TSVWriter(outdir, outfile, node_properties, edge_properties) + for ent in ents: + t.write(ent, split=True) + + t.finalize() + + assert os.path.exists("{}/splits/{}_Disease_nodes.tsv".format(outdir, outfile)) + assert os.path.exists("{}/splits/{}_{}_edges.tsv".format(outdir, outfile, split_edge_file_substring)) + assert os.path.exists("{}/splits/{}_Gene_nodes.tsv".format(outdir, outfile)) + + assert os.path.exists("{}/{}_nodes.tsv".format(outdir, outfile)) and os.path.exists( + "{}/{}_edges.tsv".format(outdir, outfile) + ) From 7eb1b67a4c7cec79d6720586c5cec6ab297a0944 Mon Sep 17 00:00:00 2001 From: Harshad Hegde Date: Mon, 15 Jul 2024 18:21:27 -0500 Subject: [PATCH 4/5] manage reruns --- src/koza/io/writer/tsv_writer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/koza/io/writer/tsv_writer.py b/src/koza/io/writer/tsv_writer.py index 690826d..9e9d6dc 100644 --- a/src/koza/io/writer/tsv_writer.py +++ b/src/koza/io/writer/tsv_writer.py @@ -2,6 +2,7 @@ # NOTE - May want to rename to KGXWriter at some point, if we develop writers for other models non biolink/kgx specific from pathlib import Path +import shutil from typing import Dict, Iterable, List, Literal, Set, Union from ordered_set import OrderedSet @@ -13,6 +14,8 @@ class TSVWriter(KozaWriter): + _splits_cleanup_done = False + def __init__( self, output_dir: Union[str, Path], @@ -69,6 +72,9 @@ def write_row(self, record: Dict, record_type: Literal["node", "edge"], split: b def get_new_fh_path(base_dir, filename, category): new_dir = base_dir / "splits" + if not self._splits_cleanup_done: + shutil.rmtree(new_dir, ignore_errors=True) + self._splits_cleanup_done = True new_dir.mkdir(parents=True, exist_ok=True) return new_dir / filename.replace(record_type + "s", f"{category}_{record_type}s") From 4e0ec24e8d27a1fff2fbc965e758efb49a07e550 Mon Sep 17 00:00:00 2001 From: Harshad Hegde Date: Mon, 15 Jul 2024 21:05:17 -0500 Subject: [PATCH 5/5] added more info to test --- src/koza/io/writer/tsv_writer.py | 14 ++++++------ tests/unit/test_tsvwriter_node_and_edge.py | 25 ++++++++++++++++------ 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/koza/io/writer/tsv_writer.py b/src/koza/io/writer/tsv_writer.py index 9e9d6dc..8c1f79a 100644 --- a/src/koza/io/writer/tsv_writer.py +++ b/src/koza/io/writer/tsv_writer.py @@ -84,24 +84,22 @@ def get_new_fh_path(base_dir, filename, category): if split: base_dir, filename = Path(fh.name).parent, getattr(self, f"{record_type}s_file_name").name if record_type == "node": - category = record.get("category", ["UnknownNodeCategory"])[0].split(":")[-1] + category = record.get("category", [""])[0].split(":")[-1] else: subject_category = ( - record.get("subject_category", "UnknownSubjectCategory").split(":")[-1] + record.get("subject_category", "").split(":")[-1] if record.get("subject_category") - else "UnknownSubjectCategory" + else "UnknownCategory" ) object_category = ( - record.get("object_category", "UnknownObjectCategory").split(":")[-1] + record.get("object_category", "").split(":")[-1] if record.get("object_category") - else "UnknownObjectCategory" + else "UnknownCategory" ) edge_category = ( - record.get("category", ["UnknownEdgeCategory"])[0].split(":")[-1] - if record.get("category") - else "UnknownEdgeCategory" + record.get("category", [""])[0].split(":")[-1] if record.get("category") else "UnknownCategory" ) category = subject_category + edge_category + object_category diff --git a/tests/unit/test_tsvwriter_node_and_edge.py b/tests/unit/test_tsvwriter_node_and_edge.py index c8d7d7f..692564f 100644 --- a/tests/unit/test_tsvwriter_node_and_edge.py +++ b/tests/unit/test_tsvwriter_node_and_edge.py @@ -40,8 +40,8 @@ def test_tsv_writer_split(): """ Writes a test tsv file """ - g1 = Gene(id="HGNC:11603", name="TBX4") - d1 = Disease(id="MONDO:0005002", name="chronic obstructive pulmonary disease") + g1 = Gene(id="HGNC:11603", name="TBX4", category=["biolink:Gene"]) + d1 = Disease(id="MONDO:0005002", name="chronic obstructive pulmonary disease", category=["biolink:Disease"]) a1 = GeneToDiseaseAssociation( id="uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1e", subject=g1.id, @@ -49,8 +49,10 @@ def test_tsv_writer_split(): predicate="biolink:contributes_to", knowledge_level="not_provided", agent_type="not_provided", + subject_category="biolink:Gene", + object_category="biolink:Disease", ) - g2 = Gene(id="HGNC:11604", name="TBX5") + g2 = Gene(id="HGNC:11604", name="TBX5", category=["biolink:Gene"]) d2 = Disease(id="MONDO:0005003", name="asthma") a2 = GeneToDiseaseAssociation( id="uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1f", @@ -61,7 +63,7 @@ def test_tsv_writer_split(): agent_type="not_provided", ) g3 = Gene(id="HGNC:11605", name="TBX6") - d3 = Disease(id="MONDO:0005004", name="lung cancer") + d3 = Disease(id="MONDO:0005004", name="lung cancer", category=["biolink:Disease"]) a3 = GeneToDiseaseAssociation( id="uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1g", subject=g3.id, @@ -70,14 +72,25 @@ def test_tsv_writer_split(): knowledge_level="not_provided", agent_type="not_provided", ) - ents = [[g1, d1, a1], [g2, d2, a2], [g3, d3, a3]] + g4 = Gene(id="HGNC:11606", name="TBX7") + d4 = Disease(id="MONDO:0005005", name="pulmonary fibrosis") + a4 = GeneToDiseaseAssociation( + id="uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1h", + subject=g4.id, + object=d4.id, + predicate="biolink:contributes_to", + knowledge_level="not_provided", + agent_type="not_provided", + ) + + ents = [[g1, d1, a1], [g2, d2, a2], [g3, d3, a3], [g4, d4, a4]] node_properties = ["id", "category", "symbol", "in_taxon", "provided_by", "source"] edge_properties = ["id", "subject", "predicate", "object", "category" "qualifiers", "publications", "provided_by"] outdir = "output/tests/split-examples" outfile = "tsvwriter" - split_edge_file_substring = "UnknownSubjectCategoryGeneToDiseaseAssociationUnknownObjectCategory" + split_edge_file_substring = "UnknownCategoryGeneToDiseaseAssociationUnknownCategory" t = TSVWriter(outdir, outfile, node_properties, edge_properties) for ent in ents: