From da3d4c171ab61a66caa81969f82217b3ee5e7c20 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Thu, 9 Jan 2025 13:25:48 -0300 Subject: [PATCH] feat: add inlining options and a new `inline` command Inlining here means downloading attachment URLs and insert them as attachment data, allowing you to download the attachment just once, at the cost of extra storage space. This is often useful in preparation for NLP tasks, where you want to often refer back to DocumentReference clinical notes, but do not want to constantly deal with the flakiness of an EHR server. (Or losing access to that server.) New features: - There is a new `inline` command that can inline an existing folder of NDJSON. - The `export` and `etl` commands both now accept an opt-in flag to inline data after performing a bulk export. - If the server provides a Retry-After header that is a timestamp (rather than a number of seconds), we now parse that correctly. - All server requests are retried at least a little bit upon errors. (previously, only bulk export requests were retried - now every time we hit the server, so even for Medication downloads or DocRef attachment downloads during NLP tasks. Behavior changes: - If the server gives us a 429 error, we now log it as an error message, and don't log a successful progress call. - If the server gives us a 429 error, we use the next exponential backoff delay instead of hard coding 60 seconds as the delay. - If the server gives us a Retry-After header on an error message, we no longer unconditionally accept and use it. Rather the requested delay is capped by our next exponential backoff delay. That is, the server's Retry-After time will be used if it's LESS than our next backoff, but if it's longer, we'll still use our own backoff. This is lightly hostile, but (a) it's only done on error cases, (b) our backoff times are generous, and counted in minutes not seconds, and (c) it lets us guarantee a max delay time for callers. - Instead of retrying on 429 and ALL 5xx errors, there's a specific list of error codes that we retry on. Currently it's 408, 429, 500, 502, 503, and 504. - Have the bulk exporter time out after 30 days, rather than one. We've seen Epic exports take a couple weeks. --- .github/workflows/ci.yaml | 2 +- cumulus_etl/__init__.py | 2 +- cumulus_etl/cli.py | 7 +- cumulus_etl/cli_utils.py | 72 +++++ cumulus_etl/common.py | 18 +- cumulus_etl/errors.py | 12 +- cumulus_etl/etl/cli.py | 6 + cumulus_etl/etl/tasks/task_factory.py | 13 +- cumulus_etl/export/cli.py | 6 + cumulus_etl/fhir/__init__.py | 2 + cumulus_etl/fhir/fhir_client.py | 199 ++++++++++---- cumulus_etl/fhir/fhir_utils.py | 34 ++- cumulus_etl/inliner/__init__.py | 4 + cumulus_etl/inliner/cli.py | 61 +++++ cumulus_etl/inliner/inliner.py | 224 ++++++++++++++++ cumulus_etl/inliner/reader.py | 73 +++++ cumulus_etl/inliner/writer.py | 46 ++++ cumulus_etl/loaders/fhir/bulk_export.py | 99 +++---- cumulus_etl/loaders/fhir/ndjson_loader.py | 28 +- cumulus_etl/store.py | 4 - docs/bulk-exports.md | 57 ++++ pyproject.toml | 2 +- .../covid_symptom__nlp_results.000.ndjson | 32 +-- tests/fhir/test_fhir_client.py | 97 +++++-- tests/inliner/__init__.py | 0 tests/inliner/test_inline_cli.py | 82 ++++++ tests/inliner/test_inliner.py | 252 ++++++++++++++++++ tests/inliner/test_reader.py | 99 +++++++ tests/inliner/test_writer.py | 31 +++ tests/loaders/ndjson/test_bulk_export.py | 63 ++--- tests/loaders/ndjson/test_ndjson_loader.py | 29 ++ tests/test_common.py | 9 +- tests/utils.py | 7 + 33 files changed, 1443 insertions(+), 229 deletions(-) create mode 100644 cumulus_etl/inliner/__init__.py create mode 100644 cumulus_etl/inliner/cli.py create mode 100644 cumulus_etl/inliner/inliner.py create mode 100644 cumulus_etl/inliner/reader.py create mode 100644 cumulus_etl/inliner/writer.py create mode 100644 tests/inliner/__init__.py create mode 100644 tests/inliner/test_inline_cli.py create mode 100644 tests/inliner/test_inliner.py create mode 100644 tests/inliner/test_reader.py create mode 100644 tests/inliner/test_writer.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7282976c..be2f3df1 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -113,7 +113,7 @@ jobs: # Compare results export OUTDIR=$DATADIR/run-output/covid_symptom__nlp_results sudo chown -R $(id -u) $OUTDIR - sed -i 's/"generated_on": "[^"]*", //g' $OUTDIR/*.ndjson + sed -i 's/"generated_on":"[^"]*",//g' $OUTDIR/*.ndjson diff -upr $DATADIR/expected-output $OUTDIR echo "All Good!" diff --git a/cumulus_etl/__init__.py b/cumulus_etl/__init__.py index edeb3f40..680b9d8f 100644 --- a/cumulus_etl/__init__.py +++ b/cumulus_etl/__init__.py @@ -1,3 +1,3 @@ """Turns FHIR data into de-identified & aggregated records""" -__version__ = "2.0.0" +__version__ = "2.1.0" diff --git a/cumulus_etl/cli.py b/cumulus_etl/cli.py index acc93ae6..59a66bc8 100644 --- a/cumulus_etl/cli.py +++ b/cumulus_etl/cli.py @@ -9,7 +9,7 @@ import rich.logging -from cumulus_etl import common, etl, export, upload_notes +from cumulus_etl import common, etl, export, inliner, upload_notes from cumulus_etl.etl import convert, init @@ -20,6 +20,7 @@ class Command(enum.Enum): ETL = "etl" EXPORT = "export" INIT = "init" + INLINE = "inline" UPLOAD_NOTES = "upload-notes" # Why isn't this part of Enum directly...? @@ -69,13 +70,15 @@ async def main(argv: list[str]) -> None: run_method = export.run_export elif subcommand == Command.INIT.value: run_method = init.run_init + elif subcommand == Command.INLINE.value: + run_method = inliner.run_inline else: parser.description = "Extract, transform, and load FHIR data." if not subcommand: # Add a note about other subcommands we offer, and tell argparse not to wrap our formatting parser.formatter_class = argparse.RawDescriptionHelpFormatter parser.description += "\n\nother commands available:\n" - parser.description += " convert\n export\n init\n upload-notes" + parser.description += " convert\n export\n init\n inline\n upload-notes" run_method = etl.run_etl with tempfile.TemporaryDirectory() as tempdir: diff --git a/cumulus_etl/cli_utils.py b/cumulus_etl/cli_utils.py index 0736fc80..2c7ee0bf 100644 --- a/cumulus_etl/cli_utils.py +++ b/cumulus_etl/cli_utils.py @@ -1,6 +1,7 @@ """Helper methods for CLI parsing.""" import argparse +import itertools import os import socket import tempfile @@ -61,6 +62,24 @@ def add_bulk_export(parser: argparse.ArgumentParser, *, as_subgroup: bool = True "--until", metavar="TIMESTAMP", help="end date for export from the FHIR server" ) parser.add_argument("--resume", metavar="URL", help="polling status URL from a previous export") + parser.add_argument( + "--inline", + action="store_true", + help="attachments will be inlined after the export", + ) + parser.add_argument( + "--inline-resource", + metavar="RESOURCES", + action="append", + help="only consider this resource for inlining (default is all supported inline targets: " + "DiagnosticReport and DocumentReference)", + ) + parser.add_argument( + "--inline-mimetype", + metavar="MIMETYPES", + action="append", + help="only inline this attachment mimetype (default is text, HTML, and XHTML)", + ) return parser @@ -176,3 +195,56 @@ def make_progress_bar() -> rich.progress.Progress: rich.progress.TimeElapsedColumn(), ] return rich.progress.Progress(*columns) + + +def expand_inline_resources(arg: Iterable[str] | None) -> set[str]: + """ + This converts a list of inline resource args into the final properly cased resource names. + + If you have an arg like --inline-resource, this will process that for you. + """ + allowed = {"diagnosticreport": "DiagnosticReport", "documentreference": "DocumentReference"} + + if arg is None: + return set(allowed.values()) + + resources = set(expand_comma_list_arg(arg)) + for resource in resources: + if resource.casefold() not in allowed: + errors.fatal(f"Unsupported resource for inlining: {resource}", errors.ARGS_INVALID) + + return {allowed[resource.casefold()] for resource in resources} + + +def expand_inline_mimetypes(arg: Iterable[str] | None) -> set[str]: + """ + This converts a list of inline mimetype args into a set of normalized mimetypes. + + If you have an arg like --inline-mimetype, this will process that for you. + """ + if arg is None: + return {"text/plain", "text/html", "application/xhtml+xml"} + + return set(expand_comma_list_arg(arg, casefold=True)) + + +def expand_comma_list_arg(arg: Iterable[str] | None, casefold: bool = False) -> Iterable[str]: + """ + This converts a list of string args, splits any strings on commas, and combines results. + + This is useful for CLI arguments with action="append" but you also want to allow comma + separated args. --task does this, as well as others. + + An example CLI: + --task=patient --task=condition,procedure + Would give: + ["patient", "condition,procedure"] + And this method would turn that into: + ["patient", "condition", procedure"] + """ + if arg is None: + return [] + split_args = itertools.chain.from_iterable(x.split(",") for x in arg) + if casefold: + return map(str.casefold, split_args) + return split_args diff --git a/cumulus_etl/common.py b/cumulus_etl/common.py index 999e8eb7..26d17a15 100644 --- a/cumulus_etl/common.py +++ b/cumulus_etl/common.py @@ -232,7 +232,8 @@ def write(self, obj: dict) -> None: # lazily create the file, to avoid 0-line ndjson files (unless created in __init__) self._ensure_file() - json.dump(obj, self._file) + # Specify separators for the most compact (no whitespace) representation saves disk space. + json.dump(obj, self._file, separators=(",", ":")) self._file.write("\n") @@ -316,21 +317,6 @@ def human_time_offset(seconds: int) -> str: return f"{_pretty_float(hours)}h" -def info_mode(): - logging.basicConfig() - logging.getLogger().setLevel(logging.INFO) - - -def debug_mode(): - logging.basicConfig() - logging.getLogger().setLevel(logging.DEBUG) - - -def warn_mode(): - logging.basicConfig() - logging.getLogger().setLevel(logging.WARN) - - def print_header(name: str | None = None) -> None: """Prints a section break to the console, with a name for the user""" rich.get_console().rule() diff --git a/cumulus_etl/errors.py b/cumulus_etl/errors.py index ef3f3888..992c8d94 100644 --- a/cumulus_etl/errors.py +++ b/cumulus_etl/errors.py @@ -37,6 +37,8 @@ MISSING_REQUESTED_RESOURCES = 36 TOO_MANY_SMART_CREDENTIALS = 37 BAD_SMART_CREDENTIAL = 38 +INLINE_TASK_FAILED = 39 +INLINE_WITHOUT_FOLDER = 40 class FatalError(Exception): @@ -44,13 +46,21 @@ class FatalError(Exception): class NetworkError(FatalError): - """An unrecoverable network error""" + """A network error""" def __init__(self, msg: str, response: httpx.Response): super().__init__(msg) self.response = response +class FatalNetworkError(NetworkError): + """An unrecoverable network error that should not be retried""" + + +class TemporaryNetworkError(NetworkError): + """An recoverable network error that could be retried""" + + class FhirConnectionConfigError(FatalError): """We needed to connect to a FHIR server but are not configured correctly""" diff --git a/cumulus_etl/etl/cli.py b/cumulus_etl/etl/cli.py index ca960032..04fbb79c 100644 --- a/cumulus_etl/etl/cli.py +++ b/cumulus_etl/etl/cli.py @@ -309,6 +309,9 @@ async def etl_main(args: argparse.Namespace) -> None: # Grab a list of all required resource types for the tasks we are running required_resources = set(t.resource for t in selected_tasks) + inline_resources = cli_utils.expand_inline_resources(args.inline_resource) + inline_mimetypes = cli_utils.expand_inline_mimetypes(args.inline_mimetype) + # Create a client to talk to a FHIR server. # This is useful even if we aren't doing a bulk export, because some resources like DocumentReference can still # reference external resources on the server (like the document text). @@ -326,6 +329,9 @@ async def etl_main(args: argparse.Namespace) -> None: since=args.since, until=args.until, resume=args.resume, + inline=args.inline, + inline_resources=inline_resources, + inline_mimetypes=inline_mimetypes, ) required_resources = await check_available_resources( diff --git a/cumulus_etl/etl/tasks/task_factory.py b/cumulus_etl/etl/tasks/task_factory.py index 36857475..1fc926c4 100644 --- a/cumulus_etl/etl/tasks/task_factory.py +++ b/cumulus_etl/etl/tasks/task_factory.py @@ -1,11 +1,10 @@ """Finds and creates ETL tasks""" -import itertools import sys from collections.abc import Iterable from typing import TypeVar -from cumulus_etl import errors +from cumulus_etl import cli_utils, errors from cumulus_etl.etl.studies import covid_symptom, hftest from cumulus_etl.etl.tasks import basic_tasks @@ -67,13 +66,11 @@ def get_selected_tasks( :param filter_tags: only tasks that have all the listed tags will be eligible for selection :returns: a list of selected EtlTask subclasses, to instantiate and run """ - names = names and set(itertools.chain.from_iterable(t.lower().split(",") for t in names)) - filter_tags = filter_tags and list( - itertools.chain.from_iterable(t.lower().split(",") for t in filter_tags) - ) - filter_tag_set = set(filter_tags or []) + names = set(cli_utils.expand_comma_list_arg(names, casefold=True)) + filter_tags = list(cli_utils.expand_comma_list_arg(filter_tags, casefold=True)) + filter_tag_set = set(filter_tags) - if names and "help" in names: + if "help" in names: # OK, we actually are just going to print the list of all task names and be done. _print_task_names() raise SystemExit(errors.TASK_HELP) # not an *error* exactly, but not successful ETL either diff --git a/cumulus_etl/export/cli.py b/cumulus_etl/export/cli.py index 3ce57f8d..19acc7f4 100644 --- a/cumulus_etl/export/cli.py +++ b/cumulus_etl/export/cli.py @@ -26,6 +26,9 @@ async def export_main(args: argparse.Namespace) -> None: required_resources = {t.resource for t in selected_tasks} using_default_tasks = not args.task and not args.task_filter + inline_resources = cli_utils.expand_inline_resources(args.inline_resource) + inline_mimetypes = cli_utils.expand_inline_mimetypes(args.inline_mimetype) + fhir_root = store.Root(args.url_input) client = fhir.create_fhir_client_for_cli(args, fhir_root, required_resources) @@ -39,6 +42,9 @@ async def export_main(args: argparse.Namespace) -> None: since=args.since, until=args.until, resume=args.resume, + inline=args.inline, + inline_resources=inline_resources, + inline_mimetypes=inline_mimetypes, ) await loader.load_from_bulk_export( sorted(required_resources), prefer_url_resources=using_default_tasks diff --git a/cumulus_etl/fhir/__init__.py b/cumulus_etl/fhir/__init__.py index c5d93e98..81d3b8d2 100644 --- a/cumulus_etl/fhir/__init__.py +++ b/cumulus_etl/fhir/__init__.py @@ -5,7 +5,9 @@ FhirUrl, download_reference, get_docref_note, + parse_content_type, parse_datetime, ref_resource, + request_attachment, unref_resource, ) diff --git a/cumulus_etl/fhir/fhir_client.py b/cumulus_etl/fhir/fhir_client.py index 1ac218da..b703f4ab 100644 --- a/cumulus_etl/fhir/fhir_client.py +++ b/cumulus_etl/fhir/fhir_client.py @@ -1,8 +1,10 @@ """HTTP client that talk to a FHIR server""" import argparse +import asyncio +import email import enum -from collections.abc import Iterable +from collections.abc import Callable, Iterable from json import JSONDecodeError import httpx @@ -28,6 +30,9 @@ class FhirClient: See https://hl7.org/fhir/smart-app-launch/backend-services.html for details. """ + # Limit the number of connections open at once, because EHRs tend to be very busy. + MAX_CONNECTIONS = 5 + def __init__( self, url: str | None, @@ -71,8 +76,7 @@ def __init__( self._capabilities: dict = {} async def __aenter__(self): - # Limit the number of connections open at once, because EHRs tend to be very busy. - limits = httpx.Limits(max_connections=5) + limits = httpx.Limits(max_connections=self.MAX_CONNECTIONS) timeout = 300 # five minutes to be generous # Follow redirects by default -- some EHRs definitely use them for bulk download files, # and might use them in other cases, who knows. @@ -86,7 +90,15 @@ async def __aexit__(self, exc_type, exc_value, traceback): await self._session.aclose() async def request( - self, method: str, path: str, headers: dict | None = None, stream: bool = False + self, + method: str, + path: str, + headers: dict | None = None, + stream: bool = False, + retry_delays: Iterable[int] | None = None, + request_callback: Callable[[], None] | None = None, + error_callback: Callable[[errors.NetworkError], None] | None = None, + retry_callback: Callable[[httpx.Response, int], None] | None = None, ) -> httpx.Response: """ Issues an HTTP request. @@ -104,62 +116,78 @@ async def request( :param path: relative path from the server root to request :param headers: optional header dictionary :param stream: whether to stream content in or load it all into memory at once + :param retry_delays: how many minutes to wait between retries, and how many retries to do, + defaults to [1, 1] which is three total tries across two minutes. + :param request_callback: called right before each request + :param error_callback: called after each network error + :param retry_callback: called right before sleeping :returns: The response object """ - url = fhir_auth.urljoin(self._server_root, path) + # A small note on this default retry value: + # We want to retry a few times, because EHRs can be flaky. But we don't want to retry TOO + # hard, since EHRs can disguise fatal errors behind a retryable error code (like 500 or + # 504). At least, I've seen Cerner seemingly do both. (Who can truly say if I retried that + # 504 error 100 times instead of 50, I'd have gotten through - but I'm assuming it was + # fatal.) It's not the worst thing to try hard to be certain, but since this is a widely + # used default value, let's not get too crazy with the delays unless the caller opts-in + # by providing even bigger delays as an argument. + retry_delays = [1, 1] if retry_delays is None else list(retry_delays) + retry_delays.append(None) # add a final no-delay request for the loop below + + # Actually loop, attempting the request multiple times as needed + for delay in retry_delays: + if request_callback: + request_callback() - final_headers = { - "Accept": "application/fhir+json", - "Accept-Charset": "UTF-8", - } - # merge in user headers with defaults - final_headers.update(headers or {}) + try: + return await self._one_request(method, path, headers=headers, stream=stream) + except errors.NetworkError as exc: + if error_callback: + error_callback(exc) - response = await self._request_with_signed_headers( - method, url, final_headers, stream=stream - ) + if delay is None or isinstance(exc, errors.FatalNetworkError): + raise - # Check if our access token expired and thus needs to be refreshed - if response.status_code == 401: - await self._auth.authorize(self._session, reauthorize=True) - if stream: - await response.aclose() - response = await self._request_with_signed_headers( - method, url, final_headers, stream=stream - ) + response = exc.response - try: - response.raise_for_status() - except httpx.HTTPStatusError as exc: - if exc.response.status_code == 429: - # 429 is a special kind of error -- it's not fatal, just a request to wait a bit. So let it pass. - return exc.response + # Respect Retry-After, but only if it lets us request faster than we would have + # otherwise. Which is maybe a little hostile, but this assumes that we are using + # reasonable delays ourselves (for example, our retry_delay list is in *minutes* not + # seconds). The point of this logic is so that the caller can reliably predict that + # if they give delays totaling 10 minutes, that's the longest we'll wait. + delay *= 60 # switch from minutes to seconds + delay = min(self.get_retry_after(response, delay), delay) - if stream: - await response.aread() - await response.aclose() + if retry_callback: + retry_callback(response, delay) - # All other 4xx or 5xx codes are treated as fatal errors - message = None - try: - json_response = exc.response.json() - if not isinstance(json_response, dict): - message = exc.response.text - elif json_response.get("resourceType") == "OperationOutcome": - issue = json_response["issue"][0] # just grab first issue - message = issue.get("details", {}).get("text") - message = message or issue.get("diagnostics") - except JSONDecodeError: - message = exc.response.text - if not message: - message = str(exc) + # And actually do the waiting + await asyncio.sleep(delay) - raise errors.NetworkError( - f'An error occurred when connecting to "{url}": {message}', - response, - ) from exc + def get_retry_after(self, response: httpx.Response, default: int) -> int: + """ + Returns the value of the Retry-After header, in seconds. - return response + Parsing can be tricky because the header is also allowed to be in http-date format, + providing a specific timestamp. + + Since seconds is easier to work with for the ETL, we normalize to seconds. + + See https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After + """ + value = response.headers.get("Retry-After", default) + try: + return max(0, int(value)) + except ValueError: + pass + + try: + retry_time = email.utils.parsedate_to_datetime(value) + except ValueError: + return default + + delay = retry_time - common.datetime_now() + return max(0, delay.total_seconds()) def get_capabilities(self) -> dict: """ @@ -215,6 +243,77 @@ async def _read_capabilities(self) -> None: self._capabilities = capabilities + async def _one_request( + self, method: str, path: str, headers: dict | None = None, stream: bool = False + ) -> httpx.Response: + url = fhir_auth.urljoin(self._server_root, path) + + final_headers = { + "Accept": "application/fhir+json", + "Accept-Charset": "UTF-8", + } + # merge in user headers with defaults + final_headers.update(headers or {}) + + response = await self._request_with_signed_headers( + method, url, final_headers, stream=stream + ) + + # Check if our access token expired and thus needs to be refreshed + if response.status_code == 401: + await self._auth.authorize(self._session, reauthorize=True) + if stream: + await response.aclose() + response = await self._request_with_signed_headers( + method, url, final_headers, stream=stream + ) + + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + if stream: + await response.aread() + await response.aclose() + + # All other 4xx or 5xx codes are treated as fatal errors + message = None + try: + json_response = exc.response.json() + if not isinstance(json_response, dict): + message = exc.response.text + elif json_response.get("resourceType") == "OperationOutcome": + issue = json_response["issue"][0] # just grab first issue + message = issue.get("details", {}).get("text") + message = message or issue.get("diagnostics") + except JSONDecodeError: + message = exc.response.text + if not message: + message = str(exc) + + # Check if this is a retryable error, and flag it up the chain if so. + # See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status for more details. + if response.status_code in { + 408, # request timeout + 429, # too many requests (server is busy) + # 500 is so generic an error that servers may give it both for retryable cases and + # non-retryable cases. Cerner does this, for example. Since we can't distinguish + # between those cases, just always retry it. + 500, # internal server error (can be temporary blip) + 502, # bad gateway (can be temporary blip) + 503, # service unavailable (temporary blip) + 504, # gateway timeout (temporary blip) + }: + error_class = errors.TemporaryNetworkError + else: + error_class = errors.FatalNetworkError + + raise error_class( + f'An error occurred when connecting to "{url}": {message}', + response, + ) from exc + + return response + async def _request_with_signed_headers( self, method: str, url: str, headers: dict, **kwargs ) -> httpx.Response: diff --git a/cumulus_etl/fhir/fhir_utils.py b/cumulus_etl/fhir/fhir_utils.py index c717f19f..f2cb9af7 100644 --- a/cumulus_etl/fhir/fhir_utils.py +++ b/cumulus_etl/fhir/fhir_utils.py @@ -7,6 +7,7 @@ import urllib.parse from typing import TYPE_CHECKING +import httpx import inscriptis from cumulus_etl import common, errors @@ -181,7 +182,7 @@ async def download_reference(client: "FhirClient", reference: str) -> dict | Non ###################################################################################################################### -def _parse_content_type(content_type: str) -> (str, str): +def parse_content_type(content_type: str) -> (str, str): """Returns (mimetype, encoding)""" msg = email.message.EmailMessage() msg["content-type"] = content_type @@ -204,6 +205,20 @@ def _mimetype_priority(mimetype: str) -> int: return 0 +async def request_attachment(client: "FhirClient", attachment: dict) -> httpx.Response: + """ + Download the given attachment by URL. + """ + mimetype, _charset = parse_content_type(attachment["contentType"]) + return await client.request( + "GET", + attachment["url"], + # We need to pass Accept to get the raw data, not a Binary FHIR object. + # See https://www.hl7.org/fhir/binary.html + headers={"Accept": mimetype}, + ) + + async def _get_docref_note_from_attachment(client: "FhirClient", attachment: dict) -> str: """ Decodes or downloads a note from an attachment. @@ -212,21 +227,18 @@ async def _get_docref_note_from_attachment(client: "FhirClient", attachment: dic :returns: the attachment's note text """ - mimetype, charset = _parse_content_type(attachment["contentType"]) + _mimetype, charset = parse_content_type(attachment["contentType"]) if "data" in attachment: return base64.standard_b64decode(attachment["data"]).decode(charset) - # TODO: At some point we should centralize the downloading of attachments -- once we have multiple NLP tasks, - # we may not want to re-download the overlapping notes. When we do that, it should not be part of our bulk - # exporter, since we may be given already-exported ndjson. - # - # TODO: There are future optimizations to try to use our ctakes cache to avoid downloading in the first place: - # - use attachment["hash"] if available (algorithm mismatch though... maybe we should switch to sha1...) + # TODO: There are future optimizations to try to use our ctakes cache to avoid downloading in + # the first place: + # - use attachment["hash"] if available (algorithm mismatch though... maybe we should switch + # to sha1...) # - send a HEAD request with "Want-Digest: sha-256" but Cerner at least does not support that if "url" in attachment: - # We need to pass Accept to get the raw data, not a Binary object. See https://www.hl7.org/fhir/binary.html - response = await client.request("GET", attachment["url"], headers={"Accept": mimetype}) + response = await request_attachment(client, attachment) return response.text raise ValueError("No data or url field present") @@ -270,7 +282,7 @@ async def get_docref_note(client: "FhirClient", docref: dict) -> str: best_attachment_priority = 0 for index, attachment in enumerate(attachments): if "contentType" in attachment: - mimetype, _ = _parse_content_type(attachment["contentType"]) + mimetype, _ = parse_content_type(attachment["contentType"]) priority = _mimetype_priority(mimetype) if priority > best_attachment_priority: best_attachment_priority = priority diff --git a/cumulus_etl/inliner/__init__.py b/cumulus_etl/inliner/__init__.py new file mode 100644 index 00000000..a85b6e9b --- /dev/null +++ b/cumulus_etl/inliner/__init__.py @@ -0,0 +1,4 @@ +"""Inline attachments inside existing NDJSON""" + +from .cli import run_inline +from .inliner import inliner diff --git a/cumulus_etl/inliner/cli.py b/cumulus_etl/inliner/cli.py new file mode 100644 index 00000000..b36a8ed2 --- /dev/null +++ b/cumulus_etl/inliner/cli.py @@ -0,0 +1,61 @@ +"""Inline attachments inside NDJSON by downloading the URLs""" + +import argparse + +from cumulus_etl import cli_utils, common, errors, fhir, inliner, store + + +def define_inline_parser(parser: argparse.ArgumentParser) -> None: + parser.usage = "cumulus-etl inline [OPTION]... DIR FHIR_URL" + + parser.add_argument("src", metavar="/path/to/input") + parser.add_argument("url_input", metavar="https://fhir.example.com/") + + parser.add_argument( + "--resource", + metavar="RESOURCES", + action="append", + help="only consider this resource (default is all supported inline targets: " + "DiagnosticReport and DocumentReference)", + ) + parser.add_argument( + "--mimetype", + metavar="MIMETYPES", + action="append", + help="only inline this attachment mimetype (default is text, HTML, and XHTML)", + ) + + cli_utils.add_auth(parser, use_fhir_url=False) + cli_utils.add_aws(parser) + + +async def inline_main(args: argparse.Namespace) -> None: + """Exports data from an EHR to a folder.""" + # record filesystem options before creating Roots + store.set_user_fs_options(vars(args)) + + src_root = store.Root(args.src) + fhir_root = store.Root(args.url_input) + + # Help the user in case they got the order of the arguments wrong. + if fhir_root.protocol not in {"http", "https"}: + errors.fatal( + f"Provided URL {args.url_input} does not look like a URL. " + "See --help for argument usage.", + errors.ARGS_INVALID, + ) + + resources = cli_utils.expand_inline_resources(args.resource) + mimetypes = cli_utils.expand_inline_mimetypes(args.mimetype) + + common.print_header() + + async with fhir.create_fhir_client_for_cli(args, fhir_root, {"Binary"} | resources) as client: + await inliner.inliner(client, src_root, resources, mimetypes) + + +async def run_inline(parser: argparse.ArgumentParser, argv: list[str]) -> None: + """Parses an inline CLI""" + define_inline_parser(parser) + args = parser.parse_args(argv) + await inline_main(args) diff --git a/cumulus_etl/inliner/inliner.py b/cumulus_etl/inliner/inliner.py new file mode 100644 index 00000000..7ec4d34a --- /dev/null +++ b/cumulus_etl/inliner/inliner.py @@ -0,0 +1,224 @@ +import base64 +import dataclasses +import hashlib +import tempfile +from collections.abc import Iterable +from functools import partial + +import cumulus_fhir_support +import fsspec +import rich.progress +import rich.table + +from cumulus_etl import cli_utils, common, errors, fhir, store +from cumulus_etl.inliner import reader, writer + + +@dataclasses.dataclass +class InliningStats: + total_attachments: int = 0 + total_resources: int = 0 + + already_inlined_attachments: int = 0 + already_inlined_resources: int = 0 + + newly_inlined_attachments: int = 0 + newly_inlined_resources: int = 0 + + fatal_error_attachments: int = 0 + fatal_error_resources: int = 0 + + fatal_retry_attachments: int = 0 + fatal_retry_resources: int = 0 + + def merge_attachment_stats(self, other: "InliningStats") -> None: + """Takes stats for attachments in a resource and merges them into the resource's stats""" + for field in { + "total_attachments", + "already_inlined_attachments", + "newly_inlined_attachments", + "fatal_error_attachments", + "fatal_retry_attachments", + }: + if other_val := getattr(other, field): + setattr(self, field, getattr(self, field) + other_val) + resource_field = field.replace("attachments", "resources") + setattr(self, resource_field, getattr(self, resource_field) + 1) + + +async def inliner( + client: fhir.FhirClient, + in_root: store.Root, + resources: Iterable[str], + mimetypes: Iterable[str], +) -> None: + mimetypes = set(mimetypes) + + # Grab files to read for the given resources + found_files = cumulus_fhir_support.list_multiline_json_in_dir( + in_root.path, resources, fsspec_fs=in_root.fs + ) + + # Predict how much work we'll have to do by getting counts of lines and files + if in_root.protocol == "file": + total_lines = sum(common.read_local_line_count(path) for path in found_files) + else: + # We shouldn't double our network usage just for pretty progress bars... + total_lines = 0 + total_files = len(found_files) + + # Actually do the work + stats = InliningStats() + with cli_utils.make_progress_bar() as progress: + progress_task = progress.add_task("Inlining…", total=total_lines or total_files) + for path in found_files: + await _inline_one_file( + client, + path, + in_root.fs, + mimetypes=mimetypes, + stats=stats, + progress=progress if total_lines else None, + progress_task=progress_task if total_lines else None, + ) + if not total_lines: + progress.update(progress_task, advance=1) + + table = rich.table.Table( + "", + rich.table.Column(header="Attachments", justify="right"), + rich.table.Column(header="Resources", justify="right"), + box=None, + ) + table.add_row("Total examined", f"{stats.total_attachments:,}", f"{stats.total_resources:,}") + if stats.already_inlined_attachments: + table.add_row( + "Already inlined", + f"{stats.already_inlined_attachments:,}", + f"{stats.already_inlined_resources:,}", + ) + table.add_row( + "Newly inlined", + f"{stats.newly_inlined_attachments:,}", + f"{stats.newly_inlined_resources:,}", + ) + if stats.fatal_error_attachments: + table.add_row( + "Fatal errors", f"{stats.fatal_error_attachments:,}", f"{stats.fatal_error_resources:,}" + ) + if stats.fatal_retry_resources: + table.add_row( + "Retried but gave up", + f"{stats.fatal_retry_attachments:,}", + f"{stats.fatal_retry_resources:,}", + ) + rich.get_console().print(table) + + +async def _inline_one_file( + client: fhir.FhirClient, + path: str, + fs: fsspec.AbstractFileSystem, + *, + mimetypes: set[str], + stats: InliningStats, + progress: rich.progress.Progress | None, + progress_task: rich.progress.TaskID | None, +) -> None: + with tempfile.NamedTemporaryFile() as output_file: + # Use an ordered NDJSON writer to preserve the order of lines in the input file, + # which preserves the ability for users to append updated row data to files. + with writer.OrderedNdjsonWriter(output_file.name) as output: + await reader.peek_ahead_processor( + cumulus_fhir_support.read_multiline_json(path, fsspec_fs=fs), + partial( + _inline_one_line, + client=client, + mimetypes=mimetypes, + output=output, + stats=stats, + progress=progress, + progress_task=progress_task, + ), + # Look at twice the allowed connections - downloads will block, but that's OK. + # This will allow us to download some attachments while other workers are sleeping + # because they are waiting to retry due to an HTTP error. + peek_at=fhir.FhirClient.MAX_CONNECTIONS * 2, + ) + # Atomically swap out the inlined version with the original + with fs.transaction: + fs.put_file(output_file.name, path) + + +async def _inline_one_line( + index: int, + resource: dict, + *, + client: fhir.FhirClient, + mimetypes: set[str], + output: writer.OrderedNdjsonWriter, + stats: InliningStats, + progress: rich.progress.Progress | None, + progress_task: rich.progress.TaskID | None, +) -> None: + match resource.get("resourceType"): + case "DiagnosticReport": + attachments = resource.get("presentedForm", []) + case "DocumentReference": + attachments = [ + content["attachment"] + for content in resource.get("content", []) + if "attachment" in content + ] + case _: + attachments = [] # don't do anything, but we will leave the resource line in place + + attachment_stats = InliningStats() + for attachment in attachments: + await _inline_one_attachment( + client, attachment, mimetypes=mimetypes, stats=attachment_stats + ) + stats.merge_attachment_stats(attachment_stats) + + output.write(index, resource) + if progress: + progress.update(progress_task, advance=1) + + +async def _inline_one_attachment( + client: fhir.FhirClient, attachment: dict, *, mimetypes: set[str], stats: InliningStats +) -> None: + # First, check if we should even examine this attachment + if "contentType" not in attachment: + return + mimetype, _charset = fhir.parse_content_type(attachment["contentType"]) + if mimetype not in mimetypes: + return + + # OK - this is a valid attachment to process + stats.total_attachments += 1 + + if "data" in attachment: + stats.already_inlined_attachments += 1 + return + + if "url" not in attachment: + return # neither data nor url... nothing to do + + try: + response = await fhir.request_attachment(client, attachment) + except errors.FatalNetworkError: + stats.fatal_error_attachments += 1 + return + except errors.TemporaryNetworkError: + stats.fatal_retry_attachments += 1 + return + + stats.newly_inlined_attachments += 1 + + attachment["data"] = base64.standard_b64encode(response.content).decode("ascii") + # Overwrite other associated metadata with latest info (existing metadata might now be stale) + attachment["contentType"] = f"{mimetype}; charset={response.encoding}" + attachment["size"] = len(response.content) + sha1_hash = hashlib.sha1(response.content).digest() # noqa: S324 + attachment["hash"] = base64.standard_b64encode(sha1_hash).decode("ascii") diff --git a/cumulus_etl/inliner/reader.py b/cumulus_etl/inliner/reader.py new file mode 100644 index 00000000..10b7fd1c --- /dev/null +++ b/cumulus_etl/inliner/reader.py @@ -0,0 +1,73 @@ +import asyncio +from collections.abc import Awaitable, Callable, Iterable +from typing import TypeVar + +from cumulus_etl import errors + +Item = TypeVar("Item") + + +def _drain_queue(queue: asyncio.Queue) -> None: + """ + Used to empty the queue if we are early exiting. + + Once we depend on Python 3.13, we can just use Queue.shutdown() instead. + """ + while not queue.empty(): + queue.get_nowait() + queue.task_done() + + +async def _worker( + queue: asyncio.Queue, shutdown: asyncio.Event, processor: Callable[[int, Item], Awaitable[None]] +) -> None: + while True: + index, item = await queue.get() + try: + await processor(index, item) + except Exception: + shutdown.set() # flag other tasks to stop + raise + finally: + queue.task_done() + if shutdown.is_set(): + _drain_queue(queue) + + +async def _reader(queue: asyncio.Queue, shutdown: asyncio.Event, iterable: Iterable[Item]) -> None: + for index, item in enumerate(iterable): + await queue.put((index, item)) + if shutdown.is_set(): + _drain_queue(queue) + break + + +async def peek_ahead_processor( + iterable: Iterable[Item], + processor: Callable[[int, Item], Awaitable[None]], + *, + peek_at: int, +) -> None: + """Processes items in sequence, but always looks at some in parallel""" + queue = asyncio.Queue(peek_at) + shutdown = asyncio.Event() # poor substitute for Python 3.13's wonderful Queue.shutdown() + reader = asyncio.create_task(_reader(queue, shutdown, iterable), name="peek-ahead-reader") + tasks = [ + asyncio.create_task(_worker(queue, shutdown, processor), name=f"peek-ahead-worker-{i}") + for i in range(peek_at) + ] + + try: + await reader # read all the input, processing as we go + await queue.join() # finish up final batch of workers + finally: + # Close out the tasks + for task in tasks: + task.cancel() + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Check for an early exit in the tasks + for result in results: + if result and not isinstance(result, asyncio.CancelledError): + errors.fatal(str(result), errors.INLINE_TASK_FAILED) diff --git a/cumulus_etl/inliner/writer.py b/cumulus_etl/inliner/writer.py new file mode 100644 index 00000000..0570c0f0 --- /dev/null +++ b/cumulus_etl/inliner/writer.py @@ -0,0 +1,46 @@ +from cumulus_etl import common + + +class OrderedNdjsonWriter: + """ + Convenience context manager to write multiple objects to a ndjson file in order. + + Specifically, it will keep the output in the intended order, even if lines are provided + out of order. + + Note that this is not atomic - partial writes will make it to the target file. + And queued writes may not make it to the target file at all, if interrupted. + """ + + def __init__(self, path: str): + self._writer = common.NdjsonWriter(path) + self._queued_rows: dict[int, dict] = {} + self._current_num: int = 0 + + def __enter__(self): + self._writer.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._writer.__exit__(exc_type, exc_value, traceback) + + def _process_queue(self) -> None: + while self._current_num in self._queued_rows: + self._writer.write(self._queued_rows.pop(self._current_num)) + self._current_num += 1 + + def write(self, index: int, obj: dict) -> None: + """ + Writes the object to the file at the specified row number. + + May hold the row in memory until previous rows can be written first. + """ + # We just queue the rows in memory until we write them out. Our expectation is that we + # won't hold so many in the queue that it will be a memory issue. + # + # If this does turn out to be a problem, we can explore other solutions like keeping + # track of byte indices for missing rows and seeking to that location, then inserting data + # (but you'd have to be careful to shift-down/re-write the later row data and update + # other byte indices). + self._queued_rows[index] = obj + self._process_queue() diff --git a/cumulus_etl/loaders/fhir/bulk_export.py b/cumulus_etl/loaders/fhir/bulk_export.py index c6b387c6..a08bebaa 100644 --- a/cumulus_etl/loaders/fhir/bulk_export.py +++ b/cumulus_etl/loaders/fhir/bulk_export.py @@ -25,7 +25,7 @@ class BulkExporter: - Epic (https://www.epic.com/) """ - _TIMEOUT_THRESHOLD = 60 * 60 * 24 # a day, which is probably an overly generous timeout + _TIMEOUT_THRESHOLD = 60 * 60 * 24 * 30 # thirty days (we've seen some multi-week Epic waits) def __init__( self, @@ -280,73 +280,59 @@ async def _request_with_retries( :param retry_errors: if True, server-side errors will be retried a few times :returns: the HTTP response """ - # Set up error handling variables. - # These times are extremely generous - partly because we can afford to be - # as a long-running async task and partly because EHR servers seem prone to - # outages that clear up after a bit. - error_retry_minutes = [1, 2, 4, 8] # and then raise - max_errors = len(error_retry_minutes) - num_errors = 0 + + def _add_new_delay(response: httpx.Response, delay: int) -> None: + # Print a message to the user, so they don't see us do nothing for a while + if rich_text is not None: + progress_msg = response.headers.get("X-Progress", "waiting…") + formatted_total = common.human_time_offset(self._total_wait_time) + formatted_delay = common.human_time_offset(delay) + rich_text.plain = ( + f"{progress_msg} ({formatted_total} so far, waiting for {formatted_delay} more)" + ) + + self._total_wait_time += delay # Actually loop, attempting the request multiple times as needed while self._total_wait_time < self._TIMEOUT_THRESHOLD: - if log_request: - log_request() - - try: - response = await self._client.request(method, path, headers=headers, stream=stream) - except errors.NetworkError as exc: - if log_error: - log_error(exc) - if retry_errors and exc.response.is_server_error and num_errors < max_errors: - response = exc.response - else: - raise + response = await self._client.request( + method, + path, + headers=headers, + stream=stream, + # These retry times are extremely generous - partly because we can afford to be + # as a long-running async task and partly because EHR servers seem prone to + # outages that clear up after a bit. + retry_delays=[1, 2, 4, 8], # five tries, 15 minutes total + request_callback=log_request, + error_callback=log_error, + retry_callback=_add_new_delay, + ) if response.status_code == target_status_code: return response - if response.is_server_error: - num_errors += 1 - else: - num_errors = 0 # reset count if server is back to normal - - # 202 == server is still working on it, 429 == server is busy, - # 5xx == server-side error. In all cases, we wait. - if response.status_code in [202, 429] or response.is_server_error: - if log_progress and not response.is_server_error: + # 202 == server is still working on it. + if response.status_code == 202: + if log_progress: log_progress(response) - # Calculate how long to wait, with a basic exponential backoff for errors. - if num_errors: - default_delay = error_retry_minutes[num_errors - 1] * 60 - else: - default_delay = 60 # one minute - delay = int(response.headers.get("Retry-After", default_delay)) - if response.status_code == 202: - # Some servers can request unreasonably long delays (e.g. I've seen Cerner - # ask for five hours), which is... not helpful for our UX and often way - # too long for small exports. So as long as the server isn't telling us - # it's overloaded or erroring out, limit the delay time to 5 minutes. - delay = min(delay, 300) - - # Print a message to the user, so they don't see us do nothing for a while - if rich_text is not None: - progress_msg = response.headers.get("X-Progress", "waiting…") - formatted_total = common.human_time_offset(self._total_wait_time) - formatted_delay = common.human_time_offset(delay) - rich_text.plain = f"{progress_msg} ({formatted_total} so far, waiting for {formatted_delay} more)" - - # And wait as long as the server requests + # Some servers can request unreasonably long delays (e.g. I've seen Cerner + # ask for five hours), which is... not helpful for our UX and often way + # too long for small exports. So limit the delay time to 5 minutes. + delay = min(self._client.get_retry_after(response, 60), 300) + + _add_new_delay(response, delay) await asyncio.sleep(delay) - self._total_wait_time += delay else: - # It feels silly to abort on an unknown *success* code, but the spec has such clear guidance on - # what the expected response codes are, that it's not clear if a code outside those parameters means - # we should keep waiting or stop waiting. So let's be strict here for now. + # It feels silly to abort on an unknown *success* code, but the spec has such clear + # guidance on what the expected response codes are, that it's not clear if a code + # outside those parameters means we should keep waiting or stop waiting. + # So let's be strict here for now. raise errors.NetworkError( - f"Unexpected status code {response.status_code} from the bulk FHIR export server.", + f"Unexpected status code {response.status_code} " + "from the bulk FHIR export server.", response, ) @@ -443,7 +429,6 @@ async def _download_ndjson_file( lines = common.read_local_line_count(filename) self._log.download_complete(url, lines, decompressed_size) - url_last_part = url.split("/")[-1] filename_last_part = filename.split("/")[-1] human_size = common.human_file_size(response.num_bytes_downloaded) - print(f" Downloaded {url_last_part} as {filename_last_part} ({human_size})") + print(f" Downloaded {filename_last_part} ({human_size})") diff --git a/cumulus_etl/loaders/fhir/ndjson_loader.py b/cumulus_etl/loaders/fhir/ndjson_loader.py index 91078414..f38f308b 100644 --- a/cumulus_etl/loaders/fhir/ndjson_loader.py +++ b/cumulus_etl/loaders/fhir/ndjson_loader.py @@ -4,7 +4,7 @@ import cumulus_fhir_support -from cumulus_etl import cli_utils, common, errors, fhir, store +from cumulus_etl import cli_utils, common, errors, fhir, inliner, store from cumulus_etl.loaders import base from cumulus_etl.loaders.fhir.bulk_export import BulkExporter from cumulus_etl.loaders.fhir.export_log import BulkExportLogParser @@ -23,6 +23,9 @@ def __init__( since: str | None = None, until: str | None = None, resume: str | None = None, + inline: bool = False, + inline_resources: set[str] | None = None, + inline_mimetypes: set[str] | None = None, ): """ :param root: location to load ndjson from @@ -38,6 +41,9 @@ def __init__( self.since = since self.until = until self.resume = resume + self.inline = inline + self.inline_resources = inline_resources + self.inline_mimetypes = inline_mimetypes async def detect_resources(self) -> set[str] | None: if self.root.protocol in {"http", "https"}: @@ -56,7 +62,7 @@ async def load_resources(self, resources: set[str]) -> base.LoaderResults: bulk_dir = await self.load_from_bulk_export(resources) input_root = store.Root(bulk_dir.name) else: - if self.export_to or self.since or self.until or self.resume: + if self.export_to or self.since or self.until or self.resume or self.inline: errors.fatal( "You provided FHIR bulk export parameters but did not provide a FHIR server", errors.ARGS_CONFLICT, @@ -91,6 +97,15 @@ async def load_from_bulk_export( """ target_dir = cli_utils.make_export_dir(self.export_to) + if self.inline and not self.export_to: + errors.fatal( + "Attachment inlining requested, but without an export folder. " + "If you want to save inlined attachments for archiving, please specify an " + "export folder to preserve the downloaded NDJSON with --export-to. " + "See --help for more information.", + errors.INLINE_WITHOUT_FOLDER, + ) + try: bulk_exporter = BulkExporter( self.client, @@ -107,6 +122,15 @@ async def load_from_bulk_export( except errors.FatalError as exc: errors.fatal(str(exc), errors.BULK_EXPORT_FAILED) + if self.inline: + common.print_header() + await inliner.inliner( + self.client, + store.Root(target_dir.name), + self.inline_resources, + self.inline_mimetypes, + ) + return target_dir def read_loader_results( diff --git a/cumulus_etl/store.py b/cumulus_etl/store.py index a25ca752..188c0041 100644 --- a/cumulus_etl/store.py +++ b/cumulus_etl/store.py @@ -102,10 +102,6 @@ def makedirs(self, path: str) -> None: return self.fs.makedirs(path, exist_ok=True) - def rm(self, path: str, recursive=False) -> None: - """Delete a file (alias for fs.rm)""" - self.fs.rm(path, recursive=recursive) - def fsspec_options(self) -> dict: """Provides a set of storage option kwargs for fsspec calls""" return get_fs_options(self.protocol) diff --git a/docs/bulk-exports.md b/docs/bulk-exports.md index c7cf0c1f..2f5db9ca 100644 --- a/docs/bulk-exports.md +++ b/docs/bulk-exports.md @@ -121,6 +121,63 @@ This is what you may expect to archive: (this will hold a list of errors from the FHIR server as well as warnings and informational messages, despite the name) +## Downloading Clinical Notes Ahead of Time + +If you are interested in running NLP tasks, +that will usually involve downloading a lot of clinical note attachments found +in DocumentReferences (and/or DiagnosticReports). + +Since EHRs can be a little flaky, old attachment URLs may move, +and/or in case you later lose access to the EHR, +it is recommended that you download the attachments ahead of time and only once by _inlining_ them. + +### What's Inlining? + +Inlining is the process of taking an original NDJSON attachment definition like this: +```json +{ + "url": "https://example.com/Binary/document123", + "contentType": "text/html" +} +``` + +Then downloading the referenced URL, +and stuffing the results back into the NDJSON with some extra metadata like so: +```json +{ + "url": "https://example.com/Binary/document123", + "contentType": "text/html; charset=utf8", + "data": "aGVsbG8gd29ybGQ=", + "size": 11, + "hash": "Kq5sNclPz7QV2+lfQIuc6R7oRu0=" +} +``` + +Now the data is stored locally in your downloaded NDJSON +and can be processed independently of the EHR. + +### How to Inline + +Cumulus ETL has a special inlining mode. +Simply run the following command, +pointing at both a source NDJSON folder and your EHR's FHIR URL. + +```shell +cumulus-etl inline ./ndjson-folder FHIR_URL --smart-client-id XXX --smart-key /YYY +``` + +{: .note } +This will modify the data in the input folder! + +By default, this will inline text, HTML, and XHTML attachments +for any DiagnosticReports and DocumentReferences found. +But there are options to adjust those defaults. +See `--help` for more information. + +If you are using Cumulus ETL to do your bulk exporting, +you can simply pass `--inline` to the export command (see `--help` for more inlining options) +in order to inline as part of the bulk export process. + ## Resuming an Interrupted Export Bulk exports can be brittle. diff --git a/pyproject.toml b/pyproject.toml index 445d25e4..8ab095be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ dependencies = [ "openai < 2", "oracledb < 3", "philter-lite < 1", - "pyarrow < 19", + "pyarrow < 20", "rich < 14", "s3fs", ] diff --git a/tests/data/nlp-regression/expected-output/covid_symptom__nlp_results.000.ndjson b/tests/data/nlp-regression/expected-output/covid_symptom__nlp_results.000.ndjson index d55648b7..d3510eed 100644 --- a/tests/data/nlp-regression/expected-output/covid_symptom__nlp_results.000.ndjson +++ b/tests/data/nlp-regression/expected-output/covid_symptom__nlp_results.000.ndjson @@ -1,16 +1,16 @@ -{"id": "032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156.0", "docref_id": "032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 608, "end": 615, "text": "fatigue", "polarity": 0, "conceptAttributes": [{"code": "248274002", "cui": "C0015672", "codingScheme": "SNOMEDCT_US", "tui": "T184"}, {"code": "84229001", "cui": "C0015672", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156.1", "docref_id": "032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 608, "end": 615, "text": "fatigue", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0015672", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156.2", "docref_id": "032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 812, "end": 821, "text": "headaches", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0018681", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.0", "docref_id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 6, "end": 14, "text": "Headache", "polarity": 0, "conceptAttributes": [{"code": "25064002", "cui": "C0018681", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.1", "docref_id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 6, "end": 14, "text": "Headache", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0018681", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.2", "docref_id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 114, "end": 133, "text": "nausea and vomiting", "polarity": 0, "conceptAttributes": [{"code": "16932000", "cui": "C0027498", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.3", "docref_id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 114, "end": 133, "text": "nausea and vomiting", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0027498", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.4", "docref_id": "05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 603, "end": 611, "text": "fatigued", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0015672", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "13e748c21a7c50f6c59fc4613683cd5d7f76bd5d68fda20f4e81ccce74ea7930.2", "docref_id": "13e748c21a7c50f6c59fc4613683cd5d7f76bd5d68fda20f4e81ccce74ea7930", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 303, "end": 318, "text": "short of breath", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0013404", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.0", "docref_id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 343, "end": 348, "text": "cough", "polarity": 0, "conceptAttributes": [{"code": "263731006", "cui": "C0010200", "codingScheme": "SNOMEDCT_US", "tui": "T184"}, {"code": "49727002", "cui": "C0010200", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.1", "docref_id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 343, "end": 348, "text": "cough", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0010200", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.2", "docref_id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 350, "end": 356, "text": "fevers", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0015967", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.3", "docref_id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 372, "end": 378, "text": "chills", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0085593", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.4", "docref_id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 1536, "end": 1541, "text": "fever", "polarity": 0, "conceptAttributes": [{"code": "386661006", "cui": "C0015967", "codingScheme": "SNOMEDCT_US", "tui": "T184"}, {"code": "50177009", "cui": "C0015967", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.5", "docref_id": "36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4, "match": {"begin": 1536, "end": 1541, "text": "fever", "polarity": 0, "conceptAttributes": [{"code": "n/a", "cui": "C0015967", "codingScheme": "custom", "tui": "T184"}], "type": "SignSymptomMention"}} -{"id": "364aa545eca0a9744bc67c5ad914e2e9e35dd39a5c1f1a8f902e533a8641238d.0", "docref_id": "364aa545eca0a9744bc67c5ad914e2e9e35dd39a5c1f1a8f902e533a8641238d", "encounter_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "subject_id": "827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5", "task_version": 4} +{"id":"032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156.0","docref_id":"032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":608,"end":615,"text":"fatigue","polarity":0,"conceptAttributes":[{"code":"248274002","cui":"C0015672","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"84229001","cui":"C0015672","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156.1","docref_id":"032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":608,"end":615,"text":"fatigue","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0015672","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156.2","docref_id":"032b2ff6af8c883760d5a44e32ff80454d69551de6438c46be64604ddc744156","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":812,"end":821,"text":"headaches","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0018681","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.0","docref_id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":6,"end":14,"text":"Headache","polarity":0,"conceptAttributes":[{"code":"25064002","cui":"C0018681","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.1","docref_id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":6,"end":14,"text":"Headache","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0018681","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.2","docref_id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":114,"end":133,"text":"nausea and vomiting","polarity":0,"conceptAttributes":[{"code":"16932000","cui":"C0027498","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.3","docref_id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":114,"end":133,"text":"nausea and vomiting","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0027498","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e.4","docref_id":"05d0686aec0a65069a1e5b1a4937f5196b75ae336b7fbe10300882184523f95e","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":603,"end":611,"text":"fatigued","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0015672","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"13e748c21a7c50f6c59fc4613683cd5d7f76bd5d68fda20f4e81ccce74ea7930.2","docref_id":"13e748c21a7c50f6c59fc4613683cd5d7f76bd5d68fda20f4e81ccce74ea7930","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":303,"end":318,"text":"short of breath","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0013404","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.0","docref_id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":343,"end":348,"text":"cough","polarity":0,"conceptAttributes":[{"code":"263731006","cui":"C0010200","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"49727002","cui":"C0010200","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.1","docref_id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":343,"end":348,"text":"cough","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0010200","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.2","docref_id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":350,"end":356,"text":"fevers","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0015967","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.3","docref_id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":372,"end":378,"text":"chills","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0085593","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.4","docref_id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":1536,"end":1541,"text":"fever","polarity":0,"conceptAttributes":[{"code":"386661006","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"50177009","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8.5","docref_id":"36ecd07bc327bba4e5ea36e34e66ca7f4f54360aef5bbcafc745c9f144aa87f8","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4,"match":{"begin":1536,"end":1541,"text":"fever","polarity":0,"conceptAttributes":[{"code":"n/a","cui":"C0015967","codingScheme":"custom","tui":"T184"}],"type":"SignSymptomMention"}} +{"id":"364aa545eca0a9744bc67c5ad914e2e9e35dd39a5c1f1a8f902e533a8641238d.0","docref_id":"364aa545eca0a9744bc67c5ad914e2e9e35dd39a5c1f1a8f902e533a8641238d","encounter_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","subject_id":"827db3458e3d956437c2b43f441eca441851c2f2e937e2c5467fdd0c5f980db5","task_version":4} diff --git a/tests/fhir/test_fhir_client.py b/tests/fhir/test_fhir_client.py index ae9197c4..7efcb532 100644 --- a/tests/fhir/test_fhir_client.py +++ b/tests/fhir/test_fhir_client.py @@ -6,6 +6,7 @@ from unittest import mock import ddt +import httpx import respx from jwcrypto import jwk, jwt @@ -25,6 +26,8 @@ class TestFhirClient(AsyncTestCase): def setUp(self): super().setUp() + self.sleep_mock = self.patch("asyncio.sleep") + # By default, set up a working server and auth. Tests can break things as needed. self.client_id = "my-client-id" @@ -461,22 +464,6 @@ async def test_get_error_401(self, stream_mode): self.assertEqual(2, self.respx_mock["token"].call_count) - async def test_get_error_429(self): - """Verify that 429 errors are passed through and not treated as exceptions.""" - self.respx_mock.get(f"{self.server_url}/retry-me").respond(429) - self.respx_mock.get(f"{self.server_url}/nope").respond(430) - - async with fhir.FhirClient( - self.server_url, [], smart_client_id=self.client_id, smart_jwks=self.jwks - ) as server: - # Confirm 429 passes - response = await server.request("GET", "retry-me") - self.assertEqual(429, response.status_code) - - # Sanity check that 430 does not - with self.assertRaises(errors.FatalError): - await server.request("GET", "nope") - @ddt.data( # OperationOutcome { @@ -552,6 +539,84 @@ def test_reads_smart_key(self, contents, suffix, expected_result, mock_client): fhir.create_fhir_client_for_cli(args, store.Root("/tmp"), []) self.assertLessEqual(expected_result.items(), mock_client.call_args[1].items()) + @ddt.data( + (None, 120), # default to the caller's retry delay + ("10", 10), # accept shorter retry delays if the server lets us + ("200", 120), # but cap server retry delays by the caller's retry delay + ("Tue, 14 Sep 2021 21:23:58 GMT", 13), # parse http-dates too + ("abc", 120), # if parsing fails, use caller's retry delay + ("-5", 0), # floor of zero + ("Mon, 13 Sep 2021 21:23:58 GMT", 0), # floor of zero on dates too + ) + @ddt.unpack + async def test_retry_after_parsing(self, retry_after_header, expected_delay): + headers = {"Retry-After": retry_after_header} if retry_after_header else {} + self.respx_mock.get(f"{self.server_url}/file").respond(headers=headers, status_code=503) + + async with fhir.FhirClient(self.server_url, [], bearer_token="foo") as server: + with self.assertRaises(errors.TemporaryNetworkError): + await server.request("GET", "file", retry_delays=[2]) + self.assertEqual(self.sleep_mock.call_count, 1) + self.assertEqual(self.sleep_mock.call_args[0][0], expected_delay) + + async def test_callbacks(self): + self.respx_mock.get(f"{self.server_url}/file").respond(status_code=503) + request_callback = mock.MagicMock() + error_callback = mock.MagicMock() + retry_callback = mock.MagicMock() + + async with fhir.FhirClient(self.server_url, [], bearer_token="foo") as server: + with self.assertRaises(errors.TemporaryNetworkError): + await server.request( + "GET", + "file", + retry_delays=[1, 2], + request_callback=request_callback, + error_callback=error_callback, + retry_callback=retry_callback, + ) + + self.assertEqual(self.sleep_mock.call_count, 2) + self.assertEqual(self.sleep_mock.call_args_list[0][0][0], 60) + self.assertEqual(self.sleep_mock.call_args_list[1][0][0], 120) + + self.assertEqual(request_callback.call_count, 3) + self.assertEqual(request_callback.call_args, mock.call()) + + self.assertEqual(error_callback.call_count, 3) + self.assertIsInstance(error_callback.call_args[0][0], errors.TemporaryNetworkError) + + self.assertEqual(retry_callback.call_count, 2) + self.assertIsInstance(retry_callback.call_args_list[0][0][0], httpx.Response) + self.assertEqual(retry_callback.call_args_list[0][0][1], 60) + self.assertEqual(retry_callback.call_args_list[1][0][1], 120) + + @ddt.data( + # status, expect_retry + (300, False), + (400, False), + (408, True), + (429, True), + (500, True), + (501, False), + (502, True), + (503, True), + (504, True), + ) + @ddt.unpack + async def test_retry_codes(self, status_code, expect_retry): + self.respx_mock.get(f"{self.server_url}/file").respond(status_code=status_code) + + async with fhir.FhirClient(self.server_url, [], bearer_token="foo") as server: + with self.assertRaises(errors.NetworkError) as cm: + await server.request("GET", "file", retry_delays=[1]) + + self.assertEqual(self.sleep_mock.call_count, 1 if expect_retry else 0) + self.assertIsInstance( + cm.exception, + errors.TemporaryNetworkError if expect_retry else errors.FatalNetworkError, + ) + @ddt.ddt @mock.patch("cumulus_etl.fhir.fhir_auth.uuid.uuid4", new=lambda: "1234") diff --git a/tests/inliner/__init__.py b/tests/inliner/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/inliner/test_inline_cli.py b/tests/inliner/test_inline_cli.py new file mode 100644 index 00000000..e84392d0 --- /dev/null +++ b/tests/inliner/test_inline_cli.py @@ -0,0 +1,82 @@ +"""Tests for inliner/cli.py""" + +import ddt + +from cumulus_etl import cli, errors, fhir, store +from tests import utils + + +@ddt.ddt +class TestInlinerCli(utils.AsyncTestCase, utils.FhirClientMixin): + """Tests for the inline CLI""" + + def setUp(self): + super().setUp() + self.inliner = self.patch("cumulus_etl.inliner.inliner") + + async def run_inline(self, *args) -> None: + await cli.main( + [ + "inline", + "/bogus/path", + self.fhir_url, + f"--smart-client-id={self.fhir_client_id}", + f"--smart-key={self.fhir_jwks_path}", + *args, + ] + ) + + async def test_args_pass_down(self): + await self.run_inline( + "--resource=diagnosticreport", + "--mimetype=a", + "--mimetype=B,C", + ) + self.assertEqual(self.inliner.call_count, 1) + self.assertIsInstance(self.inliner.call_args[0][0], fhir.FhirClient) + self.assertEqual(self.inliner.call_args[0][1].path, "/bogus/path") + self.assertEqual(self.inliner.call_args[0][2], {"DiagnosticReport"}) + self.assertEqual(self.inliner.call_args[0][3], {"a", "b", "c"}) + + @ddt.data( + ("--resource=diagnosticreport,DOCUMENTREFERENCE",), + ("--resource=diagnosticreport", "--resource=DOCUMENTREFERENCE"), + ) + async def test_resources_combined(self, args): + await self.run_inline(*args) + self.assertEqual(self.inliner.call_count, 1) + self.assertEqual(self.inliner.call_args[0][2], {"DiagnosticReport", "DocumentReference"}) + + async def test_defaults(self): + await self.run_inline() + self.assertEqual(self.inliner.call_count, 1) + self.assertEqual(self.inliner.call_args[0][2], {"DiagnosticReport", "DocumentReference"}) + self.assertEqual( + self.inliner.call_args[0][3], {"text/plain", "text/html", "application/xhtml+xml"} + ) + + async def test_sets_fs_options(self): + await self.run_inline("--s3-region=us-west-1") + self.assertEqual( + store.get_fs_options("s3"), + { + "client_kwargs": {"region_name": "us-west-1"}, + "s3_additional_kwargs": {"ServerSideEncryption": "aws:kms"}, + }, + ) + + async def test_bad_resource(self): + self.respx_mock.stop(quiet=True) # silence "not all mocks called" + with self.assert_fatal_exit(errors.ARGS_INVALID): + await self.run_inline("--resource=patient") + + async def test_bad_url(self): + self.respx_mock.stop(quiet=True) # silence "not all mocks called" + with self.assert_fatal_exit(errors.ARGS_INVALID): + await cli.main( + [ + "inline", + "/bogus/path", + "/bogus/url", + ] + ) diff --git a/tests/inliner/test_inliner.py b/tests/inliner/test_inliner.py new file mode 100644 index 00000000..9471ddfc --- /dev/null +++ b/tests/inliner/test_inliner.py @@ -0,0 +1,252 @@ +"""Tests for inliner/inliner.py""" + +import os +from unittest import mock + +import cumulus_fhir_support +import ddt + +from cumulus_etl import common, errors, inliner, store +from tests import s3mock, utils + + +class TestInlinerBase(utils.AsyncTestCase): + """Base class for the inline tests""" + + def setUp(self): + super().setUp() + + self.input_dir = self.make_tempdir() + + self.requester = self.patch("cumulus_etl.fhir.request_attachment") + self.requester.return_value = utils.make_response(text="hello") + + async def run_inline(self, resources=None, mimetypes=None) -> None: + resources = resources or ["DiagnosticReport", "DocumentReference"] + mimetypes = mimetypes or ["text/html"] + await inliner.inliner(mock.MagicMock(), store.Root(self.input_dir), resources, mimetypes) + + @staticmethod + def make_attachment( + *, + url: str | None = None, + content_type: str | None = None, + data: str | None = None, + size: int | None = None, + sha1: str | None = None, + ) -> dict: + attachment = {} + if url: + attachment["url"] = url + if content_type: + attachment["contentType"] = content_type + if data: + attachment["data"] = data + if size: + attachment["size"] = size + if sha1: + attachment["hash"] = sha1 + return attachment + + @staticmethod + def make_dx_report(*attachments) -> dict: + return { + "resourceType": "DiagnosticReport", + "presentedForm": [*attachments], + } + + @staticmethod + def make_docref(*attachments) -> dict: + if not attachments: + return {"resourceType": "DocumentReference"} + return { + "resourceType": "DocumentReference", + "content": [{"attachment": attachment} for attachment in attachments], + } + + +@ddt.ddt +class TestInliner(TestInlinerBase): + async def test_happy_path(self): + """Test a bunch of different scenarios being handled correctly""" + with common.NdjsonWriter(f"{self.input_dir}/docrefs.jsonl") as writer: + # No attachments + writer.write(self.make_docref()) + # Already has inlined data + writer.write( + self.make_docref( + self.make_attachment(url="Binary/x", data="xxx", content_type="text/html") + ), + ) + # Has neither url nor data + writer.write(self.make_docref(self.make_attachment(content_type="text/html"))) + # Does not specify a content type (ignored currently) + writer.write(self.make_docref(self.make_attachment(url="Binary/h"))) + # Not even a docref! Not a normal thing to happen + writer.write({"resourceType": "Patient"}) + # Several attachments, with some different, some same content types + writer.write( + self.make_docref( + self.make_attachment(url="Binary/h1", content_type="text/html"), + self.make_attachment(url="Binary/p", content_type="text/plain"), + # Just a second one, to confirm we *do* handle multiple attachments, not just the + # first one. Add size/hash too, to confirm they are overridden. + self.make_attachment( + url="Binary/h2", content_type="text/html", size=1, sha1="x" + ), + ) + ) + with common.NdjsonWriter(f"{self.input_dir}/reports.jsonl") as writer: + # Confirm we handle DiagnosticReports too + writer.write( + self.make_dx_report( + self.make_attachment(url="Binary/dxr1", content_type="text/html"), + self.make_attachment(url="Binary/dxr2", content_type="text/html"), + ) + ) + + await self.run_inline() + + self.assertEqual(["docrefs.jsonl", "reports.jsonl"], os.listdir(self.input_dir)) + + docref_rows = list( + cumulus_fhir_support.read_multiline_json(f"{self.input_dir}/docrefs.jsonl") + ) + self.assertEqual( + docref_rows, + [ + # These first few are left alone + self.make_docref(), + self.make_docref( + self.make_attachment(url="Binary/x", data="xxx", content_type="text/html"), + ), + self.make_docref(self.make_attachment(content_type="text/html")), + self.make_docref(self.make_attachment(url="Binary/h")), + {"resourceType": "Patient"}, + # OK here we do handle a few + self.make_docref( + self.make_attachment( + url="Binary/h1", + content_type="text/html; charset=utf-8", + data="aGVsbG8=", + size=5, + sha1="qvTGHdzF6KLavt4PO0gs2a6pQ00=", + ), + self.make_attachment(url="Binary/p", content_type="text/plain"), + self.make_attachment( + url="Binary/h2", + content_type="text/html; charset=utf-8", + data="aGVsbG8=", + size=5, + sha1="qvTGHdzF6KLavt4PO0gs2a6pQ00=", + ), + ), + ], + ) + + report_rows = list( + cumulus_fhir_support.read_multiline_json(f"{self.input_dir}/reports.jsonl") + ) + self.assertEqual( + report_rows, + [ + # OK here we do handle a few + self.make_dx_report( + self.make_attachment( + url="Binary/dxr1", + content_type="text/html; charset=utf-8", + data="aGVsbG8=", + size=5, + sha1="qvTGHdzF6KLavt4PO0gs2a6pQ00=", + ), + self.make_attachment( + url="Binary/dxr2", + content_type="text/html; charset=utf-8", + data="aGVsbG8=", + size=5, + sha1="qvTGHdzF6KLavt4PO0gs2a6pQ00=", + ), + ), + ], + ) + + async def test_graceful_errors(self): + """Verify that we don't bail if a network error happens""" + + def requester(_client, attachment): + if attachment["url"].endswith("/501"): + raise errors.FatalNetworkError("fatal", None) + if attachment["url"].endswith("/502"): + raise errors.TemporaryNetworkError("temp", None) + return utils.make_response(text="bye") + + self.requester.side_effect = requester + + with common.NdjsonWriter(f"{self.input_dir}/docrefs.ndjson") as writer: + writer.write( + self.make_docref( + self.make_attachment(url="Binary/501", content_type="text/plain"), + self.make_attachment(url="Binary/502", content_type="text/plain"), + self.make_attachment(url="Binary/valid", content_type="text/plain"), + ) + ) + + await self.run_inline(mimetypes=["text/plain"]) + + docref_rows = list( + cumulus_fhir_support.read_multiline_json(f"{self.input_dir}/docrefs.ndjson") + ) + self.assertEqual( + docref_rows, + [ + self.make_docref( + self.make_attachment(url="Binary/501", content_type="text/plain"), + self.make_attachment(url="Binary/502", content_type="text/plain"), + self.make_attachment( + url="Binary/valid", + content_type="text/plain; charset=utf-8", + data="Ynll", + size=3, + sha1="eMmlPi8otUPqYsgmas/fNtXGPmE=", + ), + ), + ], + ) + + +class TestInlinerOnS3(TestInlinerBase, s3mock.S3Mixin): + def setUp(self): + super().setUp() + self.input_dir = self.bucket_url + + async def test_inline_on_s3(self): + """Quick test that we can read from an arbitrary input dir using fsspec""" + with common.NdjsonWriter(f"{self.bucket_url}/docrefs.ndjson") as writer: + writer.write( + self.make_docref( + self.make_attachment(url="Binary/valid", content_type="text/custom"), + ) + ) + + await self.run_inline(mimetypes=["text/custom"]) + + docref_rows = list( + cumulus_fhir_support.read_multiline_json( + f"{self.bucket_url}/docrefs.ndjson", + fsspec_fs=self.s3fs, + ) + ) + self.assertEqual( + docref_rows, + [ + self.make_docref( + self.make_attachment( + url="Binary/valid", + content_type="text/custom; charset=utf-8", + data="aGVsbG8=", + size=5, + sha1="qvTGHdzF6KLavt4PO0gs2a6pQ00=", + ), + ), + ], + ) diff --git a/tests/inliner/test_reader.py b/tests/inliner/test_reader.py new file mode 100644 index 00000000..3ee88512 --- /dev/null +++ b/tests/inliner/test_reader.py @@ -0,0 +1,99 @@ +"""Tests for inliner/reader.py""" + +import asyncio + +from cumulus_etl import errors +from cumulus_etl.inliner import reader +from tests import utils + + +class TestReader(utils.AsyncTestCase): + @staticmethod + def get_task_names() -> set[str]: + return { + task.get_name() + for task in asyncio.all_tasks() + if task.get_name().startswith("peek-ahead-") + } + + async def test_parallelism(self): + events = [asyncio.Event() for _i in range(5)] + called = [asyncio.Event() for _i in range(5)] + finished = [asyncio.Event() for _i in range(5)] + + async def processor(index: int, event) -> None: + called[index].set() + await event.wait() + finished[index].set() + + peek_task = asyncio.create_task(reader.peek_ahead_processor(events, processor, peek_at=2)) + + # Wait for first two worker calls to be made (and confirm the last worker isn't called) + await asyncio.wait_for(called[0].wait(), 1) + await asyncio.wait_for(called[1].wait(), 1) + self.assertFalse(called[2].is_set()) + self.assertFalse(called[3].is_set()) + self.assertFalse(called[4].is_set()) + self.assertFalse(finished[0].is_set()) + self.assertFalse(finished[1].is_set()) + self.assertFalse(finished[2].is_set()) + self.assertFalse(finished[3].is_set()) + self.assertFalse(finished[4].is_set()) + self.assertFalse(peek_task.done()) + + # Confirm we have the full set of expected tasks + self.assertEqual( + self.get_task_names(), + {"peek-ahead-reader", "peek-ahead-worker-0", "peek-ahead-worker-1"}, + ) + + # Release one worker, and confirm we then grab the next one. + events[0].set() + await asyncio.wait_for(finished[0].wait(), 1) + await asyncio.wait_for(called[2].wait(), 1) + self.assertFalse(called[3].is_set()) + self.assertFalse(called[4].is_set()) + self.assertFalse(finished[1].is_set()) + self.assertFalse(finished[2].is_set()) + self.assertFalse(finished[3].is_set()) + self.assertFalse(finished[4].is_set()) + self.assertFalse(peek_task.done()) + + # Confirm that the read worker has finished (it reads 2 ahead of the consumers, so it + # will have been able to read all 5 inputs now) + async def no_more_reader(): + while "peek-ahead-reader" in self.get_task_names(): + await asyncio.sleep(0) + + await asyncio.wait_for(no_more_reader(), 1) + self.assertEqual(self.get_task_names(), {"peek-ahead-worker-0", "peek-ahead-worker-1"}) + + # Release the next worker + events[2].set() + await asyncio.wait_for(finished[2].wait(), 1) + await asyncio.wait_for(called[3].wait(), 1) + self.assertFalse(called[4].is_set()) + self.assertFalse(finished[1].is_set()) + self.assertFalse(finished[3].is_set()) + self.assertFalse(finished[4].is_set()) + self.assertFalse(peek_task.done()) + + # Release the remaining workers + events[1].set() + events[3].set() + events[4].set() + await asyncio.wait_for(finished[1].wait(), 1) + await asyncio.wait_for(finished[3].wait(), 1) + await asyncio.wait_for(finished[4].wait(), 1) + await peek_task + self.assertTrue(peek_task.done()) + + # Confirm all tasks have finished + self.assertEqual(self.get_task_names(), set()) + + async def test_worker_exception(self): + async def processor(_index: int, _item: int) -> None: + raise ValueError("boom") + + with self.assert_fatal_exit(errors.INLINE_TASK_FAILED): + await reader.peek_ahead_processor([1, 2, 3, 4], processor, peek_at=2) diff --git a/tests/inliner/test_writer.py b/tests/inliner/test_writer.py new file mode 100644 index 00000000..78610016 --- /dev/null +++ b/tests/inliner/test_writer.py @@ -0,0 +1,31 @@ +"""Tests for inliner/writer.py""" + +import cumulus_fhir_support + +from cumulus_etl.inliner import writer +from tests import utils + + +class TestWriter(utils.AsyncTestCase): + def test_basic_writing(self): + tmpdir = self.make_tempdir() + with writer.OrderedNdjsonWriter(f"{tmpdir}/test.ndjson") as ordered_writer: + ordered_writer.write(1, {"id": "one"}) + ordered_writer.write(0, {"id": "zero"}) + ordered_writer.write(5, {"id": "five"}) + ordered_writer.write(4, {"id": "four"}) + ordered_writer.write(2, {"id": "two"}) + ordered_writer.write(3, {"id": "three"}) + + rows = list(cumulus_fhir_support.read_multiline_json(f"{tmpdir}/test.ndjson")) + self.assertEqual( + rows, + [ + {"id": "zero"}, + {"id": "one"}, + {"id": "two"}, + {"id": "three"}, + {"id": "four"}, + {"id": "five"}, + ], + ) diff --git a/tests/loaders/ndjson/test_bulk_export.py b/tests/loaders/ndjson/test_bulk_export.py index 6be8ea88..4393eb42 100644 --- a/tests/loaders/ndjson/test_bulk_export.py +++ b/tests/loaders/ndjson/test_bulk_export.py @@ -484,7 +484,7 @@ async def test_file_download_error(self): }, ) self.respx_mock.get("https://example.com/con1").respond( - status_code=501, content=b'["error"]' + status_code=502, content=b'["error"]' ) with self.assertRaisesRegex( @@ -510,8 +510,9 @@ async def test_file_download_error(self): { "fileUrl": "https://example.com/con1", "body": '["error"]', - "code": 501, - "message": 'An error occurred when connecting to "https://example.com/con1": ["error"]', + "code": 502, + "message": 'An error occurred when connecting to "https://example.com/con1": ' + '["error"]', "responseHeaders": {"content-length": "9"}, }, ), @@ -587,54 +588,34 @@ async def test_delay(self): """Verify that we wait the amount of time the server asks us to""" self.mock_kickoff( side_effect=[ - # Before returning a successful kickoff, pause for an hour + # Before returning a successful kickoff, pause for a minute (1h is capped to 1m) respx.MockResponse(status_code=429, headers={"Retry-After": "3600"}), respx.MockResponse( status_code=202, headers={"Content-Location": "https://example.com/poll"} ), ] ) - self.respx_mock.get("https://example.com/poll").side_effect = [ - # default of one minute - respx.MockResponse(status_code=429, headers={"X-Progress": "chill"}, content=b"{}"), + self.respx_mock.get("https://example.com/poll").respond( # five hours (though 202 responses will get limited to five min) - respx.MockResponse(status_code=202, headers={"Retry-After": "18000"}, content=b"..."), - # 23 hours (putting us over a day) - respx.MockResponse( - status_code=429, headers={"Retry-After": "82800", "X-Progress": "plz wait"} - ), - ] + status_code=202, + headers={"Retry-After": "18000", "X-Progress": "chill"}, + content=b"...", + ) with self.assertRaisesRegex(errors.FatalError, "Timed out waiting"): await self.export() - # 86760 == 24 hours + six minutes - self.assertEqual(86760, self.exporter._total_wait_time) + # 2592060 == 30 days + one minute + self.assertEqual(2592060, self.exporter._total_wait_time) self.assertListEqual( [ - mock.call(3600), mock.call(60), mock.call(300), - mock.call(82800), + mock.call(300), + mock.call(300), ], - self.sleep_mock.call_args_list, - ) - - self.assert_log_equals( - ("kickoff", None), - ("status_progress", {"body": {}, "xProgress": "chill", "retryAfter": None}), - ("status_progress", {"body": "...", "xProgress": None, "retryAfter": "18000"}), - ("status_progress", {"body": "", "xProgress": "plz wait", "retryAfter": "82800"}), - ( - "status_error", - { - "body": None, - "code": None, - "message": "Timed out waiting for the bulk FHIR export to finish.", - "responseHeaders": None, - }, - ), + self.sleep_mock.call_args_list[:4], ) async def test_no_delete_if_interrupted(self): @@ -730,7 +711,7 @@ async def test_retry_status_poll_then_failure(self): """Verify that we retry polling the status URL on server errors""" self.mock_kickoff() self.respx_mock.get("https://example.com/poll").respond( - status_code=500, + status_code=503, content=b"Test Status Call Failed", ) @@ -756,13 +737,13 @@ async def test_retry_status_poll_then_success(self): self.mock_delete() self.respx_mock.get("https://example.com/poll").mock( side_effect=[ - httpx.Response(500), - httpx.Response(500), + httpx.Response(429), + httpx.Response(408), httpx.Response(202, json={}), - httpx.Response(500), - httpx.Response(500, headers={"Retry-After": "20"}), - httpx.Response(500), - httpx.Response(500), + httpx.Response(504), + httpx.Response(502, headers={"Retry-After": "20"}), + httpx.Response(503), + httpx.Response(504), httpx.Response(200, json={}), ], ) diff --git a/tests/loaders/ndjson/test_ndjson_loader.py b/tests/loaders/ndjson/test_ndjson_loader.py index 705c2b0d..c428fcfb 100644 --- a/tests/loaders/ndjson/test_ndjson_loader.py +++ b/tests/loaders/ndjson/test_ndjson_loader.py @@ -353,6 +353,35 @@ async def test_export_to_folder_not_local(self): await loader.load_resources(set()) self.assertEqual(cm.exception.code, errors.BULK_EXPORT_FOLDER_NOT_LOCAL) + async def test_inlining_but_no_export_to(self): + """Verify we fail if an export folder is not set when inlining""" + loader = loaders.FhirNdjsonLoader( + store.Root("http://localhost:9999"), + mock.AsyncMock(), + inline=True, + ) + with self.assertRaises(SystemExit) as cm: + await loader.load_resources(set()) + self.assertEqual(cm.exception.code, errors.INLINE_WITHOUT_FOLDER) + + @mock.patch("cumulus_etl.inliner.inliner") + async def test_inlining(self, mock_inliner): + """Verify we inline if asked""" + tmpdir = self.make_tempdir() + loader = loaders.FhirNdjsonLoader( + store.Root("http://localhost:9999"), + mock.AsyncMock(), + export_to=tmpdir, + inline=True, + inline_mimetypes={"a/b"}, + inline_resources={"DocumentReference"}, + ) + await loader.load_resources({"Patient"}) + self.assertEqual(mock_inliner.call_count, 1) + self.assertEqual(mock_inliner.call_args[0][1].path, tmpdir) + self.assertEqual(mock_inliner.call_args[0][2], {"DocumentReference"}) + self.assertEqual(mock_inliner.call_args[0][3], {"a/b"}) + async def test_reads_deleted_ids(self): """Verify we read in the deleted/ folder""" with tempfile.TemporaryDirectory() as tmpdir: diff --git a/tests/test_common.py b/tests/test_common.py index 07652c3f..54f16b98 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -14,9 +14,9 @@ @ddt.ddt -class TestLogging(utils.AsyncTestCase): +class TestHelpers(utils.AsyncTestCase): """ - Test case for common logging methods. + Test case for common helper methods. """ @ddt.data( @@ -41,6 +41,11 @@ def test_human_time_offset(self, seconds, expected_str): """Verify human_time_offset works correctly""" self.assertEqual(expected_str, common.human_time_offset(seconds)) + def test_temp_dir_requires_init(self): + common.set_global_temp_dir(None) # reset the global temp dir + with self.assertRaisesRegex(ValueError, "No temporary directory was created yet"): + common.get_temp_dir("blarg") + @ddt.ddt class TestIOUtils(s3mock.S3Mixin, utils.AsyncTestCase): diff --git a/tests/utils.py b/tests/utils.py index cb79847a..fc8bb48d 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -82,6 +82,13 @@ def patch_object(self, *args, **kwargs) -> mock.Mock: self.addCleanup(patcher.stop) return patcher.start() + @contextlib.contextmanager + def assert_fatal_exit(self, code: int | None = None): + with self.assertRaises(SystemExit) as cm: + yield + if code is not None: + self.assertEqual(cm.exception.code, code) + async def _catch_system_exit(self, method): try: ret = method()