diff --git a/README.md b/README.md index d55bb12..9c950c6 100644 --- a/README.md +++ b/README.md @@ -32,10 +32,10 @@ Provides a python native version of [FHIR-Converter](https://github.com/microsof The key features are: -* **Fastish**: Leverages Cython where possible +* **Fastish**: Minimize overhead outside the rendering engine * **Move fast**: Designed to be extensibile. Use the thin rendering API or leverage the builtin parts -* **Easy**: Designed to be easy to use, extend and deploy. -* **Robust**: Get production-ready code. +* **Easy**: Designed to be easy to use, extend and deploy +* **Robust**: Get production-ready code Limitations: * **Only CDA->FHIR** is currently builtin. Additional work is needed to implement the filters, etc to support FHIR->FHIR and HL7v2->FHIR and back. @@ -80,15 +80,13 @@ $ pip install python-fhir-converter ## Basic Usage -See [examples](./scripts/examples.py) for more indepth usage / usecases. +See [examples](https://github.com/chaseastewart/fhir-converter/blob/main/scripts/examples.py) for more indepth usage / usecases. ```python from fhir_converter.renderers import CcdaRenderer -# Render the file to string using the rendering defaults with open("data/sample/ccda/ccd.ccda") as xml_in: - # indent is provided, any other kwargs supported by dump may be provided - print(CcdaRenderer().render_fhir_string("CCD", xml_in, indent=1)) + print(CcdaRenderer().render_fhir_string("CCD", xml_in)) ``` ## Command line interface @@ -119,23 +117,22 @@ Final Memory: 37M ## Templates -Templates can be loaded from any python-liquid supported mechanism. To make packaging easier a ResourceLoader is provided. When a rendering environment is not provided, templates will be loaded from the [module](/fhir_converter/templates/). To ease the creation of user defined templates a TemplateSystemLoader is provided that allows templates to be loaded from a primary and optionally default location. This allows user defined templates to reference templates in the default location. The example user defined [templates](data/templates/ccda) reuse the default section / header templates. +Templates can be loaded from any python-liquid supported mechanism. To make packaging easier a [ResourceLoader](https://github.com/chaseastewart/fhir-converter/blob/main/fhir_converter/loaders.py#L119) is provided. When a rendering environment is not provided, templates will be loaded from the module [resources](https://github.com/chaseastewart/fhir-converter/tree/main/fhir_converter/templates/ccda). To ease the creation of user defined templates a [TemplateSystemLoader](https://github.com/chaseastewart/fhir-converter/blob/main/fhir_converter/loaders.py#L21) is provided that allows templates to be loaded from a primary and optionally default location. This allows user defined templates to reference templates in the default location. The example user defined [templates](https://github.com/chaseastewart/fhir-converter/tree/main/data/templates/ccda) reuse the default section / header templates. ## Benchmark -You can run the [benchmark](./scripts/benchmark.py) from the root of the source tree. Test rig is a 14-inch, 2021 Macbook Pro with the binned M1 PRO not in low power mode. +You can run the [benchmark](https://github.com/chaseastewart/fhir-converter/blob/main/scripts/benchmark.py) from the root of the source tree. Test rig is a 16-inch, 2023 Macbook Pro with the M3 Pro not in low power mode. Python version is 3.12.1. ```text Ordered by: cumulative time ncalls tottime percall cumtime percall filename:lineno(function) - 3 0.000 0.000 16.998 5.666 ../scripts/benchmark.py:75(render_samples) - 22 0.003 0.000 16.997 0.773 ../fhir-converter/fhir_converter/renderers.py:187(render_files_to_dir) - 484 0.002 0.000 16.968 0.035 ../fhir-converter/fhir_converter/renderers.py:220(render_to_dir) - 484 0.010 0.000 16.842 0.035 ../fhir-converter/fhir_converter/renderers.py:93(render_fhir) - 484 0.003 0.000 14.674 0.030 ../fhir-converter/fhir_converter/renderers.py:117(render_to_fhir) + 3 0.000 0.000 12.273 4.091 ./scripts/benchmark.py:75(render_samples) + 22 0.003 0.000 12.272 0.558 ./fhir-converter/fhir_converter/renderers.py:187(render_files_to_dir) + 484 0.002 0.000 12.258 0.025 ./fhir-converter/fhir_converter/renderers.py:220(render_to_dir) + 484 0.010 0.000 12.172 0.025 ./fhir-converter/fhir_converter/renderers.py:93(render_fhir) + 484 0.003 0.000 12.004 0.025 ./fhir-converter/fhir_converter/renderers.py:117(render_to_fhir) ``` -The test fixture profiles the converter using a single thread. The samples are rendered using all of the builtin templates along with the handful of user defined templates. The percall time is relative to the rendering template being used, the number of files being rendered (there is some warm up) and the size of the files to be rendered. In a 60 minute period in similar conditions a little over 100K CDA documents could be rendered into FHIR bundles. Note: including the original CDA document in the bundle as a DocumentReference adds noticable overhead to the render. Omitting this via a user defined template is recommended if this is not required for your usecase. ## Related Projects diff --git a/fhir_converter/__main__.py b/fhir_converter/__main__.py index 73bdff7..2eb8810 100644 --- a/fhir_converter/__main__.py +++ b/fhir_converter/__main__.py @@ -1,7 +1,7 @@ import argparse import os import sys -from collections.abc import Mapping, Sequence +from collections.abc import Sequence from datetime import datetime from functools import partial from pathlib import Path @@ -9,7 +9,7 @@ from textwrap import dedent, indent from time import time from traceback import print_exception -from typing import Any, Optional +from typing import Optional from liquid import Environment from psutil import Process @@ -24,7 +24,7 @@ render_files_to_dir, render_to_dir, ) -from fhir_converter.utils import mkdir_if_not_exists, rmdir_if_empty +from fhir_converter.utils import mkdir, rmdir_if_empty def main(argv: Sequence[str], prog: Optional[str] = None) -> None: @@ -61,7 +61,6 @@ def get_renderer(args: argparse.Namespace) -> DataRenderer: return partial( CcdaRenderer(get_user_defined_environment(args)).render_fhir, args.template_name, - **get_user_defined_options(args), ) @@ -73,15 +72,8 @@ def get_user_defined_environment(args: argparse.Namespace) -> Optional[Environme return None -def get_user_defined_options(args: argparse.Namespace) -> Mapping[str, Any]: - options = {} - if args.indent: - options["indent"] = args.indent - return options - - def render(render: DataRenderer, args: argparse.Namespace) -> None: - to_dir_created = mkdir_if_not_exists(args.to_dir) + to_dir_created = mkdir(args.to_dir) try: if args.from_dir: render_files_to_dir( @@ -163,12 +155,6 @@ def get_argparser(prog: Optional[str] = None) -> argparse.ArgumentParser: help="The liquid template to use when rendering the file", required=True, ) - parser.add_argument( - "--indent", - type=int, - metavar="", - help="The indentation amount or level. 0 is none.", - ) parser.add_argument( "--continue_on_error", action="store_true", diff --git a/fhir_converter/filters.py b/fhir_converter/filters.py index 8539508..f095307 100644 --- a/fhir_converter/filters.py +++ b/fhir_converter/filters.py @@ -19,15 +19,13 @@ with_context, ) from liquid.undefined import Undefined -from pyjson5 import dumps as json5_dumps +from pyjson5 import dumps as json_dumps from fhir_converter.hl7 import ( Hl7DtmPrecision, - get_ccda_components, - get_ccda_section_template_ids, + get_ccda_section, get_template_id_key, hl7_to_fhir_dtm, - is_template_id, to_fhir_dtm, ) from fhir_converter.utils import to_list @@ -59,7 +57,7 @@ def wrapper(val: object, *args: Any, **kwargs: Any) -> Any: def to_json_string(data: Any) -> str: if isinstance(data, Undefined) or not data: return "" - return json5_dumps(data) + return json_dumps(data) @liquid_filter @@ -132,41 +130,27 @@ def get_property( @mapping_filter -def get_first_ccda_sections_by_template_id(data: Mapping, template_ids: Any) -> Mapping: +def get_first_ccda_sections_by_template_id(msg: Mapping, template_ids: Any) -> Mapping: sections, search_template_ids = {}, list( filter(None, str_arg(template_ids).split("|")) ) - if search_template_ids and data: - components = get_ccda_components(data) - if components: - for template_id in search_template_ids: - template_id_key = get_template_id_key(template_id) - for component in components: - for id in get_ccda_section_template_ids(component): - if is_template_id(id, template_id): - sections[template_id_key] = component["section"] - break - if template_id_key in sections: - break + for template_id in search_template_ids: + section = get_ccda_section(msg, search_template_ids=[template_id]) + if section: + sections[get_template_id_key(template_id)] = section return sections @mapping_filter def get_ccda_section_by_template_id( - data: Mapping, template_id: Any, *template_ids: Any + msg: Mapping, template_id: Any, *template_ids: Any ) -> Mapping: search_template_ids = [template_id] if template_ids: search_template_ids += template_ids - search_template_ids = list(filter(None, map(str_arg, flatten(search_template_ids)))) - if search_template_ids and data: - for component in get_ccda_components(data): - for id in get_ccda_section_template_ids(component): - for template_id in search_template_ids: - if is_template_id(id, template_id): - return component["section"] - return {} + section = get_ccda_section(msg, search_template_ids) + return section or {} @with_context diff --git a/fhir_converter/hl7.py b/fhir_converter/hl7.py index 8f2b509..163b622 100644 --- a/fhir_converter/hl7.py +++ b/fhir_converter/hl7.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Mapping, MutableMapping, Sequence -from datetime import datetime, timedelta, tzinfo +from datetime import datetime, timedelta, timezone from enum import IntEnum from math import copysign from re import compile as re_compile @@ -10,24 +10,7 @@ from fhir_converter.utils import merge_mappings, parse_json, to_list -DTM_REGEX = re_compile(r"(\d+(?:\.\d+)?)(?:([+-]\d{2})(\d{2}))?") - - -class UTCOffset(tzinfo): - def __init__(self, minutes) -> None: - self.minutes = minutes - - def utcoffset(self, _) -> timedelta: - return timedelta(minutes=self.minutes) - - def tzname(self, _) -> str: - minutes = abs(self.minutes) - return "{0}{1:02}{2:02}".format( - "-" if self.minutes < 0 else "+", minutes // 60, minutes % 60 - ) - - def dst(self, _) -> timedelta: - return timedelta(0) +DTM_REGEX = re_compile(r"(\d+(?:\.\d*)?)(?:([+-]\d{2})(\d{2}))?") class FhirDtmPrecision(IntEnum): @@ -57,20 +40,20 @@ class Hl7DtmPrecision(IntEnum): def fhir_precision(self) -> FhirDtmPrecision: return FhirDtmPrecision[self.name] - @classmethod - def from_dtm(cls, dtm: str) -> Hl7DtmPrecision: + @staticmethod + def from_dtm(dtm: str) -> Hl7DtmPrecision: _len = len(dtm) - if _len > Hl7DtmPrecision.SEC: + if _len >= Hl7DtmPrecision.MILLIS: return Hl7DtmPrecision.MILLIS - elif _len > Hl7DtmPrecision.MIN: + elif _len == Hl7DtmPrecision.SEC: return Hl7DtmPrecision.SEC - elif _len > Hl7DtmPrecision.HOUR: + elif _len == Hl7DtmPrecision.MIN: return Hl7DtmPrecision.MIN - elif _len > Hl7DtmPrecision.DAY: + elif _len == Hl7DtmPrecision.HOUR: return Hl7DtmPrecision.HOUR - elif _len > Hl7DtmPrecision.MONTH: + elif _len == Hl7DtmPrecision.DAY: return Hl7DtmPrecision.DAY - elif _len > Hl7DtmPrecision.YEAR: + elif _len == Hl7DtmPrecision.MONTH: return Hl7DtmPrecision.MONTH elif _len == Hl7DtmPrecision.YEAR: return Hl7DtmPrecision.YEAR @@ -93,7 +76,7 @@ def parse_hl7_dtm(hl7_input: str) -> Hl7ParsedDtm: if tzh and tzm: minutes = int(tzh) * 60.0 minutes += copysign(int(tzm), minutes) - tzinfo = UTCOffset(minutes) + tzinfo = timezone(timedelta(minutes=minutes)) else: tzinfo = None @@ -162,16 +145,19 @@ def to_fhir_dtm(dt: datetime, precision: Optional[FhirDtmPrecision] = None) -> s return iso_dtm[: FhirDtmPrecision.YEAR] -def parse_fhir(json_input: str, encoding: str = "utf-8") -> MutableMapping: - json_data = parse_json(json_input, encoding) - unique_entrys: dict[str, dict] = {} - for entry in json_data.get("entry", []): - key = get_fhir_entry_key(entry) - if key in unique_entrys: - merge_mappings(unique_entrys[key], entry) - else: - unique_entrys[key] = entry - json_data["entry"] = list(unique_entrys.values()) +def parse_fhir(json_input: str) -> MutableMapping: + json_data = parse_json(json_input) + if json_data: + entries = to_list(json_data.get("entry", [])) + if len(entries) > 1: + unique_entrys: dict[str, dict] = {} + for entry in entries: + key = get_fhir_entry_key(entry) + if key in unique_entrys: + merge_mappings(unique_entrys[key], entry) + else: + unique_entrys[key] = entry + json_data["entry"] = list(unique_entrys.values()) return json_data @@ -189,16 +175,62 @@ def get_fhir_entry_key(entry: Mapping) -> str: ) -def get_ccda_components(data: Mapping) -> Sequence: +def get_ccda_section( + ccda: Mapping, search_template_ids: Sequence[str] +) -> Optional[Mapping]: + """get_ccda_section Gets the POCD_MT000040.Section + from the ClinicalDocument that matches one of the templateIds + + See https://github.com/HL7/CDA-core-2.0/tree/master/schema + + Arguments: + ccda (Mapping): The ccda document as a map + search_template_ids (Sequence): The templateIds + + Returns: + The section from the document if present + """ + if search_template_ids: + for component in get_ccda_component3(ccda): + for id in get_component3_section_templateId(component): + for template_id in search_template_ids: + if is_template_id(id, template_id): + return component["section"] + return None + + +def get_ccda_component3(ccda: Mapping) -> Sequence: + """get_ccda_component3 Gets the POCD_MT000040.Component3 + from the ClinicalDocument. + + See https://github.com/HL7/CDA-core-2.0/tree/master/schema + + Arguments: + ccda (Mapping): The ccda document as a map + + Returns: + The Component3 elements from the document, otherwise [] + """ return to_list( - data.get("ClinicalDocument", {}) + ccda.get("ClinicalDocument", {}) .get("component", {}) .get("structuredBody", {}) .get("component", []) ) -def get_ccda_section_template_ids(component: Mapping) -> Sequence: +def get_component3_section_templateId(component: Mapping) -> Sequence: + """get_component3_section_template_id Gets the templateId + from the POCD_MT000040.Component3. + + See https://github.com/HL7/CDA-core-2.0/tree/master/schema + + Arguments: + component (Mapping): The component3 as a map + + Returns: + The templateId from the component3, otherwise [] + """ return to_list(component.get("section", {}).get("templateId", [])) @@ -206,5 +238,5 @@ def get_template_id_key(template_id: str) -> str: return re_sub(r"[^A-Za-z0-9]", "_", template_id) -def is_template_id(id: dict, template_id: str) -> bool: +def is_template_id(id: Mapping, template_id: str) -> bool: return template_id == id.get("root", "").strip() diff --git a/fhir_converter/renderers.py b/fhir_converter/renderers.py index c7579cd..1226b30 100644 --- a/fhir_converter/renderers.py +++ b/fhir_converter/renderers.py @@ -1,26 +1,24 @@ from __future__ import annotations -from collections.abc import Callable, Generator, Mapping, MutableMapping +from collections.abc import Callable, Mapping, MutableMapping from io import StringIO -from json import dump as json_dump -from os import remove as os_remove -from os import walk as os_walk from pathlib import Path -from typing import IO, Any, NoReturn, Optional, Union +from typing import IO, Any, NoReturn, Optional, TextIO, Union from frozendict import frozendict from liquid import Environment from liquid.loaders import BaseLoader -from pyjson5 import loads as json5_loads +from pyjson5 import encode_io +from pyjson5 import loads as json_loads from fhir_converter.filters import all_filters, register_filters from fhir_converter.hl7 import parse_fhir from fhir_converter.loaders import TemplateSystemLoader, get_resource_loader, read_text from fhir_converter.tags import all_tags, register_tags -from fhir_converter.utils import parse_xml, remove_empty_dirs +from fhir_converter.utils import apply_dir, mkdir, parse_xml, rm_empty_dirs, rm_path DataInput = Union[str, IO] -DataOutput = IO +DataOutput = TextIO DataRenderer = Callable[[DataInput, DataOutput, str], None] RenderErrorHandler = Callable[[Exception], None] @@ -73,21 +71,15 @@ def __init__( def _make_globals(self, globals: Optional[Mapping[str, Any]]) -> Mapping[str, Any]: template_globals = dict(globals or {}) if "code_mapping" not in template_globals: - value_set = json5_loads( - read_text(self.env, filename="ValueSet/ValueSet.json") - ) + value_set = json_loads(read_text(self.env, filename="ValueSet/ValueSet.json")) template_globals["code_mapping"] = frozendict(value_set.get("Mapping", {})) return frozendict(template_globals) def render_fhir_string( - self, - template_name: str, - xml_in: DataInput, - encoding: str = "utf-8", - **kwargs, + self, template_name: str, xml_in: DataInput, encoding: str = "utf-8" ) -> str: with StringIO() as buffer: - self.render_fhir(template_name, xml_in, buffer, encoding, **kwargs) + self.render_fhir(template_name, xml_in, buffer, encoding) return buffer.getvalue() def render_fhir( @@ -96,10 +88,9 @@ def render_fhir( xml_in: DataInput, fhir_out: DataOutput, encoding: str = "utf-8", - **kwargs, ) -> None: """Renders the XML to FHIR writing the generated output to the supplied file - like object. Keyword arguments will be forwarded to the json serializer + like object Args: template_name (str): The rendering template @@ -108,10 +99,10 @@ def render_fhir( encoding (str, optional): The encoding to use when parsing the XML input. Defaults to "utf-8". """ - json_dump( - obj=self.render_to_fhir(template_name, xml_in, encoding), - fp=fhir_out, - **kwargs, + encode_io( + self.render_to_fhir(template_name, xml_in, encoding), + fp=fhir_out, # type: ignore + supply_bytes=False, ) def render_to_fhir( @@ -131,7 +122,6 @@ def render_to_fhir( template = self.env.get_template(template_name, globals=self.template_globals) return parse_fhir( json_input=template.render({"msg": parse_xml(xml_input, encoding)}), - encoding=encoding, ) @@ -194,27 +184,25 @@ def render_files_to_dir( onerror: RenderErrorHandler = fail, path_filter: Optional[Callable[[Path], bool]] = None, ) -> None: - def files_to_render() -> Generator[Path, Any, None]: - for root, _, filenames in os_walk(from_dir, onerror=fail): - for file_path in filter(path_filter, map(Path, filenames)): - yield Path(root).joinpath(file_path) - - try: - for from_file in files_to_render(): - if not flatten and from_dir != from_file.parent: + def render_files(root: Path, _, filenames: list[str]) -> None: + for file_path in filter(path_filter, map(Path, filenames)): + from_file = root.joinpath(file_path) + if not flatten and from_dir != root: to_file_dir = to_dir.joinpath( *[p for p in from_file.parts[:-1] if p not in from_dir.parts] ) - if not to_file_dir.is_dir(): - to_file_dir.mkdir(parents=True, exist_ok=True) + mkdir(to_file_dir, parents=True, exist_ok=True) else: to_file_dir = to_dir render_to_dir(render, from_file, to_file_dir, extension, encoding, onerror) + + try: + apply_dir(render_files, from_dir) except Exception as e: onerror(RenderingError(f"Failed to render {from_dir}", e)) finally: if not flatten: - remove_empty_dirs(to_dir) + rm_empty_dirs(to_dir) def render_to_dir( @@ -232,10 +220,7 @@ def render_to_dir( with open(out_path, "w", encoding=encoding) as data_out: render(data_in, data_out, encoding) except Exception as e: - try: - os_remove(out_path) - except OSError: - pass + rm_path(out_path) raise e except Exception as e: onerror(RenderingError(f"Failed to render {from_file}", e)) diff --git a/fhir_converter/utils.py b/fhir_converter/utils.py index 3de50d6..30444eb 100644 --- a/fhir_converter/utils.py +++ b/fhir_converter/utils.py @@ -1,27 +1,28 @@ -from collections.abc import Callable, Generator, MutableMapping +from collections.abc import Callable, MutableMapping, Sequence +from os import remove as os_remove from os import walk as os_walk from pathlib import Path from re import compile as re_compile from typing import IO, Any, Union -from pyjson5 import loads as json5_loads +from pyjson5 import loads as json_loads from xmltodict import parse as xmltodict_parse line_endings_regex = re_compile(r"\r\n?|\n") -def apply( - data: MutableMapping, func: Callable[[MutableMapping, tuple], None] +def apply_mapping( + func: Callable[[MutableMapping, tuple], None], data: MutableMapping ) -> MutableMapping: for key in set(data.keys()): val = data[key] if isinstance(val, MutableMapping): - apply(val, func) + apply_mapping(func, val) elif isinstance(val, list): new_list = [] for el in val: if isinstance(el, MutableMapping): - apply(el, func) + apply_mapping(func, el) if el: new_list.append(el) if new_list: @@ -36,15 +37,6 @@ def apply( return data -def remove_null_empty(data: MutableMapping) -> MutableMapping: - def _remove_null_empty(d: MutableMapping, key_val: tuple) -> None: - key, val = key_val - if not val: - del d[key] - - return apply(data, _remove_null_empty) - - def merge_mappings(a: MutableMapping, b: MutableMapping) -> MutableMapping: for bk, bv in b.items(): if bk not in a: @@ -63,17 +55,20 @@ def merge_mappings(a: MutableMapping, b: MutableMapping) -> MutableMapping: def to_list(obj: Any) -> list: - if obj is None: + if obj is None or not obj: return [] elif isinstance(obj, list): return obj return [obj] -def parse_json(json_input: str, encoding: str = "utf-8") -> MutableMapping: - return remove_null_empty( - json5_loads(json_input.strip(), encoding=encoding), - ) +def parse_json(json_input: str) -> MutableMapping: + def remove_null_empty(json: MutableMapping, key_val: tuple) -> None: + key, val = key_val + if not val: + del json[key] + + return apply_mapping(remove_null_empty, json_loads(json_input.strip())) def parse_xml(xml_input: Union[str, IO], encoding: str = "utf-8") -> MutableMapping: @@ -99,28 +94,42 @@ def parse_xml(xml_input: Union[str, IO], encoding: str = "utf-8") -> MutableMapp return data -def remove_empty_dirs(parent: Path) -> None: - def empty_dirs() -> Generator[Path, Any, None]: - for root, dirs, filenames in os_walk(parent): - if not dirs and not filenames: - dir = Path(root) - if dir != parent: - yield dir +def rm_empty_dirs(parent: Path) -> None: + def rmdir(root: Path, dirs: list[str], filenames: list[str]) -> None: + if root != parent and not dirs and not filenames: + try: + root.rmdir() + except OSError: + pass - for dir in empty_dirs(): - try: - dir.rmdir() - except OSError: - pass + apply_dir(rmdir, parent) -def mkdir_if_not_exists(dir: Path, **kwargs) -> bool: +def rmdir_if_empty(dir: Path) -> None: + if next(dir.iterdir(), None) is None: + dir.rmdir() + + +def rm_path(path: Path) -> None: + try: + os_remove(path) + except OSError: + pass + + +def apply_dir(func: Callable[[Path, list[str], list[str]], None], dir: Path) -> None: + for root, dirs, filenames in os_walk(dir): + func(Path(root), dirs, filenames) + + +def mkdir(dir: Path, **kwargs) -> bool: if not dir.is_dir(): dir.mkdir(**kwargs) return True return False -def rmdir_if_empty(dir: Path) -> None: - if next(dir.iterdir(), None) is None: - dir.rmdir() +def mkdirs(root: Path, dirnames: Sequence[str], **kwargs) -> None: + mkdir(root, **kwargs) + for dirname in dirnames: + mkdir(root.joinpath(dirname), **kwargs) diff --git a/pyproject.toml b/pyproject.toml index a699cd1..44551ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,12 +4,14 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "python-fhir-converter" -version = "0.0.17" +version = "0.0.18" authors = ["Chase Stewart "] description = "Transformation utility to translate data formats into FHIR" readme = "README.md" license = "MIT" +homepage = "https://github.com/chaseastewart/fhir-converter" repository = "https://github.com/chaseastewart/fhir-converter" +documentation = "https://chaseastewart.github.io/fhir-converter/" classifiers = [ "Development Status :: 4 - Beta", "Environment :: Console", diff --git a/scripts/benchmark.py b/scripts/benchmark.py index 400264b..a51495e 100644 --- a/scripts/benchmark.py +++ b/scripts/benchmark.py @@ -37,10 +37,10 @@ def main() -> None: from cProfile import Profile from pstats import SortKey, Stats - utils.mkdir_if_not_exists(data_out_dir) - mkdirs_if_not_exists(data_builtin_dir, builtin_templates) - mkdirs_if_not_exists(data_user_defined_dir, user_defined_templates) - mkdirs_if_not_exists(data_all_dir, all_templates) + utils.mkdir(data_out_dir) + utils.mkdirs(data_builtin_dir, builtin_templates) + utils.mkdirs(data_user_defined_dir, user_defined_templates) + utils.mkdirs(data_all_dir, all_templates) before = time.perf_counter_ns() with Profile() as pr: @@ -57,7 +57,7 @@ def main() -> None: templates=user_defined_templates, to_dir=data_user_defined_dir, ) - render_samples(renderer, templates=all_templates, to_dir=data_all_dir, indent=2) + render_samples(renderer, templates=all_templates, to_dir=data_all_dir) with open(data_out_dir.joinpath("stats.log"), "w") as stats_log: Stats(pr, stream=stats_log).sort_stats(SortKey.CUMULATIVE).print_stats() @@ -66,12 +66,6 @@ def main() -> None: ) -def mkdirs_if_not_exists(root: Path, dirnames: Sequence[str]) -> None: - utils.mkdir_if_not_exists(root) - for dirname in dirnames: - utils.mkdir_if_not_exists(root.joinpath(dirname)) - - def render_samples( renderer: renderers.CcdaRenderer, templates: Sequence[str], to_dir: Path, **kwargs ) -> None: diff --git a/scripts/examples.py b/scripts/examples.py index 620e1d3..5e97acf 100644 --- a/scripts/examples.py +++ b/scripts/examples.py @@ -10,7 +10,7 @@ render_files_to_dir, render_to_dir, ) -from fhir_converter.utils import mkdir_if_not_exists +from fhir_converter.utils import mkdir templates_dir, sample_data_dir, data_out_dir = ( Path("data/templates/ccda"), @@ -19,12 +19,11 @@ ) from_file = sample_data_dir.joinpath("CCD.ccda") -mkdir_if_not_exists(data_out_dir) +mkdir(data_out_dir) -# Render the file to string using the rendering defaults indenting the output +# Render the file to string using the rendering defaults with open(from_file) as xml_in: - # indent is provided, any other kwargs supported by dump may be provided - print(CcdaRenderer().render_fhir_string("CCD", xml_in, indent=1)) + print(CcdaRenderer().render_fhir_string("CCD", xml_in)) # Create a renderer that will load the user defined templates into the rendering env renderer = CcdaRenderer( diff --git a/tests/test_filters.py b/tests/test_filters.py index 721b5b6..08fd7fd 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -181,32 +181,20 @@ def test_date(self) -> None: class FormatAsDateTimeTest(TestCase, FilterTest): template = """{{ date | format_as_date_time }}""" - @fixture(autouse=True, scope="function") - def hl7_to_fhir_dtm_mock(self, mocker): - mocked = mocker.patch( - "fhir_converter.filters.hl7_to_fhir_dtm", - return_value="2024-01-10T06:35:57.920Z", - ) - self._hl7_to_fhir_dtm_mock = mocked - return mocked - def setUp(self) -> None: self.setup_template() def test_undefined(self) -> None: result = self.bound_template.render() self.assertEqual(result, "") - self._hl7_to_fhir_dtm_mock.assert_not_called() def test_empty_date(self) -> None: result = self.bound_template.render(date="") self.assertEqual(result, "") - self._hl7_to_fhir_dtm_mock.assert_not_called() def test(self) -> None: result = self.bound_template.render(date="20240110063557.920+0000") self.assertEqual(result, "2024-01-10T06:35:57.920Z") - self._hl7_to_fhir_dtm_mock.assert_called_once_with("20240110063557.920+0000") class NowTest(TestCase, FilterTest): @@ -252,22 +240,22 @@ def test_uuid(self) -> None: class GetPropertTest(TestCase, FilterTest): template = """{{ status | get_property: key, property }}""" - def setUp(self) -> None: - self.setup_template( - globals={ - "code_mapping": { - "RequestStatus": { - "fatal": {"code": "severe", "display": "very severe"}, - "retry": {"other": "retrying"}, - "__default__": { - "code": "bad", - "display": "very bad", - "other": "could be worse", - }, - } - } + template_globals: dict = { + "code_mapping": { + "RequestStatus": { + "fatal": {"code": "severe", "display": "very severe"}, + "retry": {"other": "retrying"}, + "__default__": { + "code": "bad", + "display": "very bad", + "other": "could be worse", + }, } - ) + } + } + + def setUp(self) -> None: + self.setup_template(globals=self.template_globals) def test_undefined(self) -> None: result = self.bound_template.render() diff --git a/tests/test_hl7.py b/tests/test_hl7.py new file mode 100644 index 0000000..18aa794 --- /dev/null +++ b/tests/test_hl7.py @@ -0,0 +1,474 @@ +from unittest import TestCase + +from pytest import raises + +from fhir_converter.hl7 import ( + FhirDtmPrecision, + Hl7DtmPrecision, + get_ccda_component3, + get_component3_section_templateId, + get_fhir_entry_key, + get_template_id_key, + hl7_to_fhir_dtm, + is_template_id, + parse_fhir, + parse_hl7_dtm, +) + + +class Hl7DtmPrecisionTest(TestCase): + def test_fhir_precision_year(self) -> None: + self.assertEqual(Hl7DtmPrecision.YEAR.fhir_precision, FhirDtmPrecision.YEAR) + + def test_fhir_precision_month(self) -> None: + self.assertEqual(Hl7DtmPrecision.MONTH.fhir_precision, FhirDtmPrecision.MONTH) + + def test_fhir_precision_day(self) -> None: + self.assertEqual(Hl7DtmPrecision.DAY.fhir_precision, FhirDtmPrecision.DAY) + + def test_fhir_precision_hour(self) -> None: + self.assertEqual(Hl7DtmPrecision.HOUR.fhir_precision, FhirDtmPrecision.HOUR) + + def test_fhir_precision_min(self) -> None: + self.assertEqual(Hl7DtmPrecision.MIN.fhir_precision, FhirDtmPrecision.MIN) + + def test_fhir_precision_sec(self) -> None: + self.assertEqual(Hl7DtmPrecision.SEC.fhir_precision, FhirDtmPrecision.SEC) + + def test_fhir_precision_millis(self) -> None: + self.assertEqual(Hl7DtmPrecision.MILLIS.fhir_precision, FhirDtmPrecision.MILLIS) + + def test_from_dtm_year(self) -> None: + self.assertEqual(Hl7DtmPrecision.YEAR, Hl7DtmPrecision.from_dtm("2024")) + + def test_from_dtm_month(self) -> None: + self.assertEqual(Hl7DtmPrecision.MONTH, Hl7DtmPrecision.from_dtm("202401")) + + def test_from_dtm_day(self) -> None: + self.assertEqual(Hl7DtmPrecision.DAY, Hl7DtmPrecision.from_dtm("20240110")) + + def test_from_dtm_hour(self) -> None: + self.assertEqual(Hl7DtmPrecision.HOUR, Hl7DtmPrecision.from_dtm("2024011006")) + + def test_from_dtm_min(self) -> None: + self.assertEqual(Hl7DtmPrecision.MIN, Hl7DtmPrecision.from_dtm("202401100635")) + + def test_from_dtm_sec(self) -> None: + self.assertEqual(Hl7DtmPrecision.SEC, Hl7DtmPrecision.from_dtm("20240110063557")) + + def test_from_dtm_millis(self) -> None: + self.assertEqual( + Hl7DtmPrecision.MILLIS, Hl7DtmPrecision.from_dtm("20240110063557.920") + ) + + +class FhirDtmPrecisionTest(TestCase): + def test_timespec_year(self) -> None: + self.assertEqual("seconds", FhirDtmPrecision.YEAR.timespec) + + def test_timespec_month(self) -> None: + self.assertEqual("seconds", FhirDtmPrecision.MONTH.timespec) + + def test_timespec_day(self) -> None: + self.assertEqual("seconds", FhirDtmPrecision.DAY.timespec) + + def test_timespec_hour(self) -> None: + self.assertEqual("seconds", FhirDtmPrecision.HOUR.timespec) + + def test_timespec_min(self) -> None: + self.assertEqual("seconds", FhirDtmPrecision.MIN.timespec) + + def test_timespec_sec(self) -> None: + self.assertEqual("seconds", FhirDtmPrecision.SEC.timespec) + + def test_timespec_millis(self) -> None: + self.assertEqual("milliseconds", FhirDtmPrecision.MILLIS.timespec) + + +class ParseHl7DtmTest(TestCase): + def test_empty_str(self) -> None: + with raises(ValueError): + parse_hl7_dtm("") + + def test_blank_str(self) -> None: + with raises(ValueError): + parse_hl7_dtm(" ") + + def test_less_than_year(self) -> None: + with raises(ValueError): + parse_hl7_dtm("200") + + def test_less_than_month(self) -> None: + with raises(ValueError): + parse_hl7_dtm("20041") + + def test_less_than_day(self) -> None: + with raises(ValueError): + parse_hl7_dtm("2004101") + + def test_less_than_hour(self) -> None: + with raises(ValueError): + parse_hl7_dtm("200410121") + + def test_less_than_min(self) -> None: + with raises(ValueError): + parse_hl7_dtm("20041012101") + + def test_less_than_sec(self) -> None: + with raises(ValueError): + parse_hl7_dtm("2004101210154") + + def test_less_than_millis(self) -> None: + with raises(ValueError): + parse_hl7_dtm("20041012101545.") + + def test_strip_whitespace(self) -> None: + result = parse_hl7_dtm(" 2024 ") + self.assertEqual(result.precision, Hl7DtmPrecision.YEAR) + self.assertEqual("2024-01-01T00:00:00", result.dt.isoformat()) + + def test_year(self) -> None: + result = parse_hl7_dtm("2024") + self.assertEqual(result.precision, Hl7DtmPrecision.YEAR) + self.assertEqual("2024-01-01T00:00:00", result.dt.isoformat()) + + def test_month(self) -> None: + result = parse_hl7_dtm("202402") + self.assertEqual(result.precision, Hl7DtmPrecision.MONTH) + self.assertEqual("2024-02-01T00:00:00", result.dt.isoformat()) + + def test_day(self) -> None: + result = parse_hl7_dtm("20240210") + self.assertEqual(result.precision, Hl7DtmPrecision.DAY) + self.assertEqual("2024-02-10T00:00:00", result.dt.isoformat()) + + def test_hour(self) -> None: + result = parse_hl7_dtm("2024021006") + self.assertEqual(result.precision, Hl7DtmPrecision.HOUR) + self.assertEqual("2024-02-10T06:00:00", result.dt.isoformat()) + + def test_min(self) -> None: + result = parse_hl7_dtm("202402100635") + self.assertEqual(result.precision, Hl7DtmPrecision.MIN) + self.assertEqual("2024-02-10T06:35:00", result.dt.isoformat()) + + def test_sec(self) -> None: + result = parse_hl7_dtm("20240210063557") + self.assertEqual(result.precision, Hl7DtmPrecision.SEC) + self.assertEqual("2024-02-10T06:35:57", result.dt.isoformat()) + + def test_millis(self) -> None: + result = parse_hl7_dtm("20240210063557.920") + self.assertEqual(result.precision, Hl7DtmPrecision.MILLIS) + self.assertEqual( + "2024-02-10T06:35:57.920", result.dt.isoformat(timespec="milliseconds") + ) + + def test_tz_utc(self) -> None: + result = parse_hl7_dtm("20240210063557.920+0000") + self.assertEqual(result.precision, Hl7DtmPrecision.MILLIS) + self.assertEqual( + "2024-02-10T06:35:57.920+00:00", result.dt.isoformat(timespec="milliseconds") + ) + + def test_tz_plus(self) -> None: + result = parse_hl7_dtm("20240210063557.920+0100") + self.assertEqual(result.precision, Hl7DtmPrecision.MILLIS) + self.assertEqual( + "2024-02-10T06:35:57.920+01:00", result.dt.isoformat(timespec="milliseconds") + ) + + def test_tz_minus(self) -> None: + result = parse_hl7_dtm("20240210063557.920-0100") + self.assertEqual(result.precision, Hl7DtmPrecision.MILLIS) + self.assertEqual( + "2024-02-10T06:35:57.920-01:00", result.dt.isoformat(timespec="milliseconds") + ) + + def test_hour_tz(self) -> None: + result = parse_hl7_dtm("2024021006+0400") + self.assertEqual(result.precision, Hl7DtmPrecision.HOUR) + self.assertEqual("2024-02-10T06:00:00+04:00", result.dt.isoformat()) + + def test_min_tz(self) -> None: + result = parse_hl7_dtm("202402100635+0400") + self.assertEqual(result.precision, Hl7DtmPrecision.MIN) + self.assertEqual("2024-02-10T06:35:00+04:00", result.dt.isoformat()) + + +class Hl7ToFhirDtmTest(TestCase): + def test_year(self) -> None: + self.assertEqual("2024", hl7_to_fhir_dtm("2024")) + + def test_month(self) -> None: + self.assertEqual("2024-02", hl7_to_fhir_dtm("202402")) + + def test_month_day(self) -> None: + self.assertEqual("2024-02-10", hl7_to_fhir_dtm("20240210")) + + def test_month_hour(self) -> None: + self.assertEqual("2024-02-10T06:00:00", hl7_to_fhir_dtm("2024021006")) + + def test_month_min(self) -> None: + self.assertEqual("2024-02-10T06:35:00", hl7_to_fhir_dtm("202402100635")) + + def test_month_sec(self) -> None: + self.assertEqual("2024-02-10T06:35:57", hl7_to_fhir_dtm("20240210063557")) + + def test_utc(self) -> None: + self.assertEqual( + "2024-02-10T06:35:57.920Z", hl7_to_fhir_dtm("20240210063557.920+0000") + ) + self.assertEqual( + "2024-02-10T06:35:57.920Z", hl7_to_fhir_dtm("20240210063557.920-0000") + ) + + def test_tz_plus(self) -> None: + self.assertEqual( + "2024-02-10T06:35:57.920+01:00", hl7_to_fhir_dtm("20240210063557.920+0100") + ) + + def test_tz_minus(self) -> None: + self.assertEqual( + "2024-02-10T06:35:57.920-01:00", hl7_to_fhir_dtm("20240210063557.920-0100") + ) + + def test_precision_greater(self) -> None: + res = hl7_to_fhir_dtm("20240210063557.920-0100", precision=Hl7DtmPrecision.DAY) + self.assertEqual("2024-02-10", res) + + def test_precision_less(self) -> None: + res = hl7_to_fhir_dtm("202402", precision=Hl7DtmPrecision.DAY) + self.assertEqual("2024-02", res) + + def test_precision(self) -> None: + res = hl7_to_fhir_dtm("20240210", precision=Hl7DtmPrecision.DAY) + self.assertEqual("2024-02-10", res) + + +class ParseFhirTest(TestCase): + def test_empty(self) -> None: + self.assertEqual({}, parse_fhir("{}")) + + def test_empty_entry(self) -> None: + self.assertEqual( + {"resourceType": "Bundle", "type": "batch"}, + parse_fhir('{"resourceType": "Bundle", "type": "batch", "entry": []}'), + ) + + def test(self) -> None: + fhir_json = { + "resourceType": "Bundle", + "type": "batch", + "entry": [ + { + "fullUrl": "urn:uuid:8c92075f-ae59-6be3-037f", + "resource": { + "resourceType": "Observation", + "id": "8c92075f-ae59-6be3-037f-e2d87e29185a", + "meta": { + "profile": [ + "http://hl7.org/fhir/us/core/StructureDefinition/us-core-observationresults" + ] + }, + "identifier": [ + { + "system": "urn:ietf:rfc:3986", + "value": "urn:uuid:c03e5445-af1b-4911-a419-e2782f21448c", + } + ], + "effectiveDateTime": "2014-10-01T10:30:26-05:00", + "bodySite": { + "coding": [ + { + "code": "302509004", + "display": "Entire Heart", + "system": "http://snomed.info/sct", + } + ] + }, + }, + }, + ], + } + fhir_str = "".join( + [ + '{"resourceType":"Bundle","type":"batch","entry":[', + '{"fullUrl":"urn:uuid:8c92075f-ae59-6be3-037f",', + '"resource":{"resourceType":"Observation",', + '"id":"8c92075f-ae59-6be3-037f-e2d87e29185a","meta":{"profile":', + '["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observationresults"]},', + '"identifier":[{"system":"urn:ietf:rfc:3986","value":"urn:uuid:c03e5445-af1b-4911-a419-e2782f21448c"}]', + '}},{"fullUrl":"urn:uuid:8c92075f-ae59-6be3-037f-e2d87e29185a","resource":{"resourceType":', + '"Observation","id":"8c92075f-ae59-6be3-037f-e2d87e29185a","effectiveDateTime":', + '"2014-10-01T10:30:26-05:00","bodySite":{"coding":[{"code":"302509004","display":', + '"Entire Heart","system":"http://snomed.info/sct"}]}}}]}', + ] + ) + self.assertEqual(fhir_json, parse_fhir(fhir_str)) + + +class GetFhirEntryKeyTest(TestCase): + def test_empty(self) -> None: + self.assertEqual("", get_fhir_entry_key({})) + + def test_empty_resource(self) -> None: + self.assertEqual("", get_fhir_entry_key({"resource": {}})) + + def test_basic_fields(self) -> None: + res = get_fhir_entry_key({"resource": {"resourceType": "observation", "id": "1"}}) + self.assertEqual("observation_1", res) + + def test_empty_type(self) -> None: + res = get_fhir_entry_key({"resource": {"resourceType": "", "id": "1"}}) + self.assertEqual("1", res) + + res = get_fhir_entry_key({"resource": {"id": "1"}}) + self.assertEqual("1", res) + + def test_empty_id(self) -> None: + res = get_fhir_entry_key({"resource": {"resourceType": "observation", "id": ""}}) + self.assertEqual("observation", res) + + res = get_fhir_entry_key({"resource": {"resourceType": "observation"}}) + self.assertEqual("observation", res) + + def test_empty_meta(self) -> None: + res = get_fhir_entry_key( + { + "resource": { + "resourceType": "observation", + "id": "1", + "meta": {}, + } + } + ) + self.assertEqual("observation_1", res) + + def test_all_fields(self) -> None: + res = get_fhir_entry_key( + { + "resource": { + "resourceType": "observation", + "id": "1", + "meta": {"versionId": "0"}, + } + } + ) + self.assertEqual("observation_0_1", res) + + +class GetCcdaComponent3Test(TestCase): + def test_empty(self) -> None: + self.assertEqual([], get_ccda_component3({})) + + def test_empty_document(self) -> None: + self.assertEqual([], get_ccda_component3({"ClinicalDocument": {}})) + + def test_empty_component2(self) -> None: + self.assertEqual([], get_ccda_component3({"ClinicalDocument": {"component": {}}})) + + def test_empty_structuredbody(self) -> None: + res = get_ccda_component3( + {"ClinicalDocument": {"component": {"structuredBody": {}}}} + ) + self.assertEqual([], res) + + def test_empty_component3(self) -> None: + res = get_ccda_component3( + {"ClinicalDocument": {"component": {"structuredBody": {"component": {}}}}} + ) + self.assertEqual([], res) + + res = get_ccda_component3( + {"ClinicalDocument": {"component": {"structuredBody": {"component": []}}}} + ) + self.assertEqual([], res) + + def test_component3(self) -> None: + res = get_ccda_component3( + { + "ClinicalDocument": { + "component": { + "structuredBody": {"component": {"templateId": "1.2.3"}} + } + } + } + ) + self.assertEqual([{"templateId": "1.2.3"}], res) + + def test_many_component3(self) -> None: + res = get_ccda_component3( + { + "ClinicalDocument": { + "component": { + "structuredBody": { + "component": [ + {"templateId": "1.2.3"}, + {"templateId": "3.2.1"}, + ] + } + } + } + } + ) + self.assertEqual([{"templateId": "1.2.3"}, {"templateId": "3.2.1"}], res) + + +class GetComponet3SectionTemplateIdTest(TestCase): + def test_empty_component(self) -> None: + self.assertEqual([], get_component3_section_templateId({})) + + def test_empty_section(self) -> None: + self.assertEqual([], get_component3_section_templateId({"section": {}})) + + def test_empty_templateId(self) -> None: + res = get_component3_section_templateId({"section": {"templateId": ""}}) + self.assertEqual([], res) + + def test_templateId(self) -> None: + res = get_component3_section_templateId({"section": {"templateId": "1.2.3"}}) + self.assertEqual(["1.2.3"], res) + + def test_templateId_list(self) -> None: + res = get_component3_section_templateId( + {"section": {"templateId": ["1.2.3", "23.1"]}} + ) + self.assertEqual(["1.2.3", "23.1"], res) + + +class GetTemplateIdKeyTest(TestCase): + def test_id_empty(self) -> None: + self.assertEqual("", get_template_id_key("")) + + def test_numbers_letters(self) -> None: + self.assertEqual("043be7ae", get_template_id_key("043be7ae")) + + def test_guid(self) -> None: + self.assertEqual( + "ca8505ac_b18e_11ee_a506_0242ac120002", + get_template_id_key("ca8505ac-b18e-11ee-a506-0242ac120002"), + ) + + def test_id(self) -> None: + self.assertEqual( + "2_16_840_1_113883_10_20_5_4", + get_template_id_key("2.16.840.1.113883.10.20.5.4"), + ) + + +class IsTemplateIdTest(TestCase): + def test_id_empty(self) -> None: + self.assertFalse(is_template_id({}, "1.2.3")) + + def test_id_empty_root(self) -> None: + self.assertFalse(is_template_id({"root": ""}, "1.2.3")) + + def test_equal(self) -> None: + self.assertTrue(is_template_id({"root": "1.2.3"}, "1.2.3")) + + def test_not_equal(self) -> None: + self.assertFalse(is_template_id({"root": "3.2.1"}, "1.2.3")) + + def test_strip_whitespace(self) -> None: + self.assertTrue(is_template_id({"root": " 1.2.3 "}, "1.2.3"))