Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't ignore schemas of contained resources #2

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cumulus_fhir_support/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""FHIR support code for the Cumulus project"""

__version__ = "1.0.0"
__version__ = "1.1.0"

from .schemas import pyarrow_schema_from_rows
59 changes: 55 additions & 4 deletions cumulus_fhir_support/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ def pyarrow_schema_from_rows(resource_type: str, rows: Iterable[dict] = None) ->
:param rows: optionally a set of JSON FHIR resources to ensure are covered by the schema
:returns: a PyArrow schema that covers the unified shape of all provided rows
"""
rows = list(rows or [])

# Examine batch to see the full shape of it, in order to detect any deeply nested fields
# that we want to make sure to include in the final schema (normally, we go wide but only as
# deep as we need to)
batch_shape = _get_shape_of_dicts(None, rows and list(rows))
batch_shape = _get_shape_of_dicts(None, rows)

return _create_pyarrow_schema_for_resource(resource_type, batch_shape)
schema = _create_pyarrow_schema_for_resource(resource_type, batch_shape)
schema = _include_contained_schemas(schema, rows, batch_shape)
return schema


def _get_shape_of_dicts(total_shape: Optional[dict], item: Any) -> dict:
Expand Down Expand Up @@ -79,7 +83,53 @@ def _get_shape_of_dicts(total_shape: Optional[dict], item: Any) -> dict:
return total_shape


def _create_pyarrow_schema_for_resource(resource_type: str, batch_shape: dict) -> pyarrow.Schema:
def _include_contained_schemas(
schema: pyarrow.Schema, rows: list[dict], batch_shape: dict
) -> pyarrow.Schema:
"""
This will include all contained resource schemas into one big contained schema.

Specifically, any field found in the shape of the "contained" field will be included,
as long as any resource in the contained list (detected via "resourceType") has the field.

Also see https://github.com/smart-on-fhir/cumulus-etl/issues/250 for discussion
of whether it is wise to just comingle the schemas like this.
"""
# Grab all contained resource types that we have in the source data,
# which will inform the expected schema inside there.
contained_types = sorted(
filter(
None,
{
contained_obj.get("resourceType")
for row in rows
for contained_obj in row.get("contained", [])
},
)
)
if not contained_types:
return schema # no need to do anything
contained_shape = batch_shape.get("contained")

# Allow any found fields in any of the contained types
fields = {}
for contained_type in contained_types:
subschema = _create_pyarrow_schema_for_resource(contained_type, contained_shape, wide=False)
for name in subschema.names:
fields[name] = subschema.field(name) # will overwrite previous field of same name
fields = [fields[name] for name in sorted(fields)] # sort for a consistent order

contained_index = schema.get_field_index("contained")
schema = schema.remove(contained_index)
return schema.insert(
contained_index,
pyarrow.field("contained", pyarrow.list_(pyarrow.struct(fields))),
)


def _create_pyarrow_schema_for_resource(
resource_type: str, batch_shape: dict, wide: bool = True
) -> pyarrow.Schema:
"""
Creates a PyArrow schema based off the named resource (like 'Observation').

Expand All @@ -93,8 +143,9 @@ def _create_pyarrow_schema_for_resource(resource_type: str, batch_shape: dict) -
# fhirclient doesn't include `resourceType` in the list of properties. So do that manually.
type_field = pyarrow.field("resourceType", pyarrow.string())

level = 0 if wide else 2
return pyarrow.schema(
[type_field, *_fhir_obj_to_pyarrow_fields(instance, batch_shape, level=0)]
[type_field, *_fhir_obj_to_pyarrow_fields(instance, batch_shape, level=level)]
)


Expand Down
38 changes: 38 additions & 0 deletions tests/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,41 @@ def test_non_spec_field_are_ignored(self):
schema = support.pyarrow_schema_from_rows("Observation", rows)

self.assertNotIn("invalid_field", schema.names)

def test_contained_resources(self):
"""Verify that we include contained schemas"""
# Also see https://github.com/smart-on-fhir/cumulus-etl/issues/250 for discussion
# of whether it is wise to just comingle the schema like this.
rows = [
{
"contained": [
{"resourceType": "Medication", "code": {"text": "aspirin"}},
{"resourceType": "Patient", "gender": "unknown", "extraField": False},
{"unknownField": True, "gender": 3},
]
}
]
schema = support.pyarrow_schema_from_rows("MedicationRequest", rows)
contained_type = schema.field("contained").type.value_type

# Not-mentioned fields aren't present, but mentioned ones are

# Medication
self.assertEqual(pyarrow.string(), contained_type.field("code").type.field("text").type)
self.assertEqual(-1, contained_type.get_field_index("status"))

# Patient
self.assertEqual(pyarrow.string(), contained_type.field("gender").type)
self.assertEqual(-1, contained_type.get_field_index("birthDate"))

# Extra fields
self.assertEqual(-1, contained_type.get_field_index("extraField"))
self.assertEqual(-1, contained_type.get_field_index("unknownField"))

def test_contained_resources_empty(self):
"""Verify that we leave basic schema in there if no contained resources"""
schema = support.pyarrow_schema_from_rows("AllergyIntolerance")
contained_type = schema.field("contained").type.value_type
self.assertEqual(pyarrow.string(), contained_type.field("id").type)
self.assertEqual(pyarrow.string(), contained_type.field("implicitRules").type)
self.assertEqual(pyarrow.string(), contained_type.field("language").type)
Loading