From a02741066649ad2f0e157b2528e11349e9ccf786 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Wed, 22 Nov 2023 08:53:54 -0500 Subject: [PATCH] fix: don't ignore schemas of contained resources If a field appears in {"contained": [{my resource fields}]}, it will now also appear in the resulting schema. If multiple resources are in the contained list, we'll use all the relevant resource schema to find the column schema for mentioned fields. This is slightly janky, but it's not clear how else to handle this without pulling resources out of the contained list. A project for the future. --- cumulus_fhir_support/__init__.py | 2 +- cumulus_fhir_support/schemas.py | 59 +++++++++++++++++++++++++++++--- tests/test_schemas.py | 38 ++++++++++++++++++++ 3 files changed, 94 insertions(+), 5 deletions(-) diff --git a/cumulus_fhir_support/__init__.py b/cumulus_fhir_support/__init__.py index ad9c09c..adde653 100644 --- a/cumulus_fhir_support/__init__.py +++ b/cumulus_fhir_support/__init__.py @@ -1,5 +1,5 @@ """FHIR support code for the Cumulus project""" -__version__ = "1.0.0" +__version__ = "1.1.0" from .schemas import pyarrow_schema_from_rows diff --git a/cumulus_fhir_support/schemas.py b/cumulus_fhir_support/schemas.py index 3173f21..7b2cdc9 100644 --- a/cumulus_fhir_support/schemas.py +++ b/cumulus_fhir_support/schemas.py @@ -39,12 +39,16 @@ def pyarrow_schema_from_rows(resource_type: str, rows: Iterable[dict] = None) -> :param rows: optionally a set of JSON FHIR resources to ensure are covered by the schema :returns: a PyArrow schema that covers the unified shape of all provided rows """ + rows = list(rows or []) + # Examine batch to see the full shape of it, in order to detect any deeply nested fields # that we want to make sure to include in the final schema (normally, we go wide but only as # deep as we need to) - batch_shape = _get_shape_of_dicts(None, rows and list(rows)) + batch_shape = _get_shape_of_dicts(None, rows) - return _create_pyarrow_schema_for_resource(resource_type, batch_shape) + schema = _create_pyarrow_schema_for_resource(resource_type, batch_shape) + schema = _include_contained_schemas(schema, rows, batch_shape) + return schema def _get_shape_of_dicts(total_shape: Optional[dict], item: Any) -> dict: @@ -79,7 +83,53 @@ def _get_shape_of_dicts(total_shape: Optional[dict], item: Any) -> dict: return total_shape -def _create_pyarrow_schema_for_resource(resource_type: str, batch_shape: dict) -> pyarrow.Schema: +def _include_contained_schemas( + schema: pyarrow.Schema, rows: list[dict], batch_shape: dict +) -> pyarrow.Schema: + """ + This will include all contained resource schemas into one big contained schema. + + Specifically, any field found in the shape of the "contained" field will be included, + as long as any resource in the contained list (detected via "resourceType") has the field. + + Also see https://github.com/smart-on-fhir/cumulus-etl/issues/250 for discussion + of whether it is wise to just comingle the schemas like this. + """ + # Grab all contained resource types that we have in the source data, + # which will inform the expected schema inside there. + contained_types = sorted( + filter( + None, + { + contained_obj.get("resourceType") + for row in rows + for contained_obj in row.get("contained", []) + }, + ) + ) + if not contained_types: + return schema # no need to do anything + contained_shape = batch_shape.get("contained") + + # Allow any found fields in any of the contained types + fields = {} + for contained_type in contained_types: + subschema = _create_pyarrow_schema_for_resource(contained_type, contained_shape, wide=False) + for name in subschema.names: + fields[name] = subschema.field(name) # will overwrite previous field of same name + fields = [fields[name] for name in sorted(fields)] # sort for a consistent order + + contained_index = schema.get_field_index("contained") + schema = schema.remove(contained_index) + return schema.insert( + contained_index, + pyarrow.field("contained", pyarrow.list_(pyarrow.struct(fields))), + ) + + +def _create_pyarrow_schema_for_resource( + resource_type: str, batch_shape: dict, wide: bool = True +) -> pyarrow.Schema: """ Creates a PyArrow schema based off the named resource (like 'Observation'). @@ -93,8 +143,9 @@ def _create_pyarrow_schema_for_resource(resource_type: str, batch_shape: dict) - # fhirclient doesn't include `resourceType` in the list of properties. So do that manually. type_field = pyarrow.field("resourceType", pyarrow.string()) + level = 0 if wide else 2 return pyarrow.schema( - [type_field, *_fhir_obj_to_pyarrow_fields(instance, batch_shape, level=0)] + [type_field, *_fhir_obj_to_pyarrow_fields(instance, batch_shape, level=level)] ) diff --git a/tests/test_schemas.py b/tests/test_schemas.py index c78afba..aafa9d3 100644 --- a/tests/test_schemas.py +++ b/tests/test_schemas.py @@ -181,3 +181,41 @@ def test_non_spec_field_are_ignored(self): schema = support.pyarrow_schema_from_rows("Observation", rows) self.assertNotIn("invalid_field", schema.names) + + def test_contained_resources(self): + """Verify that we include contained schemas""" + # Also see https://github.com/smart-on-fhir/cumulus-etl/issues/250 for discussion + # of whether it is wise to just comingle the schema like this. + rows = [ + { + "contained": [ + {"resourceType": "Medication", "code": {"text": "aspirin"}}, + {"resourceType": "Patient", "gender": "unknown", "extraField": False}, + {"unknownField": True, "gender": 3}, + ] + } + ] + schema = support.pyarrow_schema_from_rows("MedicationRequest", rows) + contained_type = schema.field("contained").type.value_type + + # Not-mentioned fields aren't present, but mentioned ones are + + # Medication + self.assertEqual(pyarrow.string(), contained_type.field("code").type.field("text").type) + self.assertEqual(-1, contained_type.get_field_index("status")) + + # Patient + self.assertEqual(pyarrow.string(), contained_type.field("gender").type) + self.assertEqual(-1, contained_type.get_field_index("birthDate")) + + # Extra fields + self.assertEqual(-1, contained_type.get_field_index("extraField")) + self.assertEqual(-1, contained_type.get_field_index("unknownField")) + + def test_contained_resources_empty(self): + """Verify that we leave basic schema in there if no contained resources""" + schema = support.pyarrow_schema_from_rows("AllergyIntolerance") + contained_type = schema.field("contained").type.value_type + self.assertEqual(pyarrow.string(), contained_type.field("id").type) + self.assertEqual(pyarrow.string(), contained_type.field("implicitRules").type) + self.assertEqual(pyarrow.string(), contained_type.field("language").type)