Skip to content

Commit

Permalink
Revert "Reverted atomic extract changes"
Browse files Browse the repository at this point in the history
This reverts commit 2ce7145.
  • Loading branch information
ruscoder committed Feb 26, 2025
1 parent 52417cd commit 3368d36
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 33 deletions.
87 changes: 54 additions & 33 deletions app/sdc/extract.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,68 @@
from aiohttp import ClientSession, web
from funcy.seqs import flatten


async def external_service_extraction(client, service, template, context):
async with ClientSession() as session:
async with session.post(
service,
json={
"template": template,
"context": context,
},
) as result:
if 200 <= result.status <= 299:
bundle = await result.json()
return await client.execute("/", data=bundle)
else:
raise web.HTTPBadRequest(body=await result.text())
async def get_external_service_bundle(session, service, template, context):
async with session.post(
service,
json={
"template": template,
"context": context,
},
) as result:
if 200 <= result.status <= 299:
return await result.json()
else:
raise web.HTTPBadRequest(body=await result.text())


async def execute_mappers_bundles(client, mappers_bundles):
not_transaction = any(bundle.get("type") != "transaction" for bundle in mappers_bundles)

result_bundle = {
"resourceType": "Bundle",
"type": "batch" if not_transaction else "transaction",
"entry": list(flatten(bundle["entry"] for bundle in mappers_bundles)),
}

return await client.execute("/", data=result_bundle)


async def extract(client, mappings, context, extract_services):
"""
mappings could be a list of Aidbox Mapping resources
or plain jute templates
"""
resp = []

for mapper in mappings:
if "resourceType" in mapper and "body" in mapper:
# It is custome mapper resource
mapper_type = mapper.get("type", "JUTE")
if mapper_type == "JUTE" and extract_services["JUTE"] == "aidbox":
# Aidbox native extraction
resp.append(await mapper.execute("$apply", data=context))
async with ClientSession() as session:
resp = []
mappers_bundles = []

for mapper in mappings:
if "resourceType" in mapper and "body" in mapper:
# It is custome mapper resource
mapper_type = mapper.get("type", "JUTE")
if mapper_type == "JUTE" and extract_services["JUTE"] == "aidbox":
mapper_bundle = await mapper.execute("$debug", data=context)
if "entry" not in mapper_bundle:
continue

mappers_bundles.append(mapper_bundle)
else:
# Use 3rd party service FHIRPathMapping or JUTE
mappers_bundles.append(
await get_external_service_bundle(
session, extract_services[mapper_type], mapper["body"], context
)
)
else:
# Use 3rd party service FHIRPathMapping or JUTE
resp.append(
await external_service_extraction(
client, extract_services[mapper_type], mapper["body"], context
# legacy extraction
mappers_bundles.append(
await get_external_service_bundle(
session, extract_services["JUTE"], mapper, context
)
)
else:
# legacy extraction
resp.append(
await external_service_extraction(client, extract_services["JUTE"], mapper, context)
)

return resp
if len(mappers_bundles) > 0:
resp.append(await execute_mappers_bundles(client, mappers_bundles))

return resp
176 changes: 176 additions & 0 deletions tests/sdc/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,3 +758,179 @@ async def test_fhir_extract_using_list_endpoint_fails_because_of_constraint_chec
Questionnaire=q.serialize(), QuestionnaireResponse=qr
),
)


MULTIPLE_MAPPERS_TEST_DATA = {
"m1_data": {
"body": {
"resourceType": "Bundle",
"type": "transaction",
"entry": [
{
"request": {"url": "/Patient", "method": "POST"},
"resource": {
"resourceType": "Patient",
"id": """$ fhirpath("QuestionnaireResponse.item.where(linkId='patientId').answer.children().string").0""",
},
}
],
}
},
"m2_data": {
"body": {
"resourceType": "Bundle",
"type": "transaction",
"entry": [
{
"request": {"url": "/Observation", "method": "POST"},
"resource": {
"resourceType": "Observation",
"code": {
"coding": [
{
"code": """$ fhirpath("QuestionnaireResponse.item.where(linkId='observationCode').answer.children().string").0"""
}
]
},
"status": "final",
},
}
],
}
},
"m3_data": {
"body": {
"resourceType": "Bundle",
"type": "transaction",
"entry": [
{
"request": {"url": "/Observation", "method": "POST"},
"resource": {
"resourceType": "Observation",
# Wrong data to test that multiple mappers extract is atomic
"code": """$ fhirpath("QuestionnaireResponse.item.where(linkId='observationCode').answer.children().string").0""",
"status": "final",
},
}
],
}
},
"patientId": "newPatient",
"observationCode": "obs1",
}


@pytest.mark.asyncio
async def test_fce_extract_multiple_mappers(aidbox_client, safe_db):
m1 = aidbox_client.resource(
"Mapping",
**MULTIPLE_MAPPERS_TEST_DATA["m1_data"],
)

m2 = aidbox_client.resource(
"Mapping",
**MULTIPLE_MAPPERS_TEST_DATA["m2_data"],
)

await m1.save()
await m2.save()

q = aidbox_client.resource(
"Questionnaire",
**{
"status": "active",
"mapping": [
{"resourceType": "Mapping", "id": m1.id},
{"resourceType": "Mapping", "id": m2.id},
],
"item": [
{"type": "string", "linkId": "patientId"},
{"type": "string", "linkId": "observationCode"},
],
},
)
await q.save()

patientId = MULTIPLE_MAPPERS_TEST_DATA["patientId"]
observationCode = MULTIPLE_MAPPERS_TEST_DATA["observationCode"]

qr = aidbox_client.resource(
"QuestionnaireResponse",
**{
"questionnaire": q.id,
"item": [
{"linkId": "patientId", "answer": [{"value": {"string": patientId}}]},
{"linkId": "observationCode", "answer": [{"value": {"string": observationCode}}]},
],
},
)

extraction = await q.execute("$extract", data=qr)

assert extraction[0]["resourceType"] == "Bundle"
assert len(extraction[0]["entry"]) == 2

p = await aidbox_client.resources("Patient").search(id=patientId).fetch_all()
o = await aidbox_client.resources("Observation").search(code=observationCode).fetch_all()

assert len(p) == 1
assert len(o) == 1

assert p[0].id == patientId
assert o[0].code["coding"][0]["code"] == observationCode
assert o[0].status == "final"


@pytest.mark.asyncio
async def test_fce_extract_multiple_mappers_is_atomic(aidbox_client, safe_db):
m1 = aidbox_client.resource(
"Mapping",
**MULTIPLE_MAPPERS_TEST_DATA["m1_data"],
)

m2 = aidbox_client.resource(
"Mapping",
**MULTIPLE_MAPPERS_TEST_DATA["m3_data"],
)

await m1.save()
await m2.save()

q = aidbox_client.resource(
"Questionnaire",
**{
"status": "active",
"mapping": [
{"resourceType": "Mapping", "id": m1.id},
{"resourceType": "Mapping", "id": m2.id},
],
"item": [
{"type": "string", "linkId": "patientId"},
{"type": "string", "linkId": "observationCode"},
],
},
)
await q.save()

patientId = MULTIPLE_MAPPERS_TEST_DATA["patientId"]
observationCode = MULTIPLE_MAPPERS_TEST_DATA["observationCode"]

qr = aidbox_client.resource(
"QuestionnaireResponse",
**{
"questionnaire": q.id,
"item": [
{"linkId": "patientId", "answer": [{"value": {"string": patientId}}]},
{"linkId": "observationCode", "answer": [{"value": {"string": observationCode}}]},
],
},
)

with pytest.raises(OperationOutcome):
await q.execute("$extract", data=qr)

p = await aidbox_client.resources("Patient").search(id=patientId).fetch_all()
o = await aidbox_client.resources("Observation").search(code=observationCode).fetch_all()

assert p == []
assert o == []

0 comments on commit 3368d36

Please sign in to comment.