Skip to content

Commit

Permalink
wrap research_data row addition in critical section (#4412)
Browse files Browse the repository at this point in the history
wrap research_data row addition in critical section, keyed on unique
constraint, as we do rarely experience asynchronous collisions.

this is intended to prevent exceptions such as the following, seen once
during the initial population of 100K rows:

```
Unexpected exception in `cache_research_data_task` on 231 : (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "ix_research_data_questionnaire_response_id"
DETAIL:  Key (questionnaire_response_id)=(97480) already exists.
```

Co-authored-by: Ivan Cvitkovic <[email protected]>
  • Loading branch information
pbugni and ivan-c authored Oct 14, 2024
1 parent 6161b55 commit 8cd97f0
Showing 1 changed file with 43 additions and 39 deletions.
82 changes: 43 additions & 39 deletions portal/models/research_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,46 +97,50 @@ def add_questionnaire_response(questionnaire_response, research_study_id):
"""
from .qb_timeline import qb_status_visit_name

from ..timeout_lock import TimeoutLock
# TN-3250, don't include QNRs without assigned visits, i.e. qb_id > 0
if not questionnaire_response.questionnaire_bank_id:
return

instrument = questionnaire_response.document['questionnaire']['reference'].split('/')[-1]
if research_study_id is None:
research_study_id = research_study_id_from_questionnaire(instrument)

patient_fields = ("careProvider", "identifier")
document = questionnaire_response.document_answered.copy()
subject = questionnaire_response.subject
document['encounter'] = questionnaire_response.encounter.as_fhir()
document["subject"] = {
k: v for k, v in subject.as_fhir().items() if k in patient_fields
}

if subject.organizations:
providers = []
for org in subject.organizations:
org_ref = Reference.organization(org.id).as_fhir()
identifiers = [i.as_fhir() for i in org.identifiers if i.system == "http://pcctc.org/"]
if identifiers:
org_ref['identifier'] = identifiers
providers.append(org_ref)
document["subject"]["careProvider"] = providers

qb_status = qb_status_visit_name(
subject.id,
research_study_id,
FHIR_datetime.parse(questionnaire_response.document['authored']))
document["timepoint"] = qb_status['visit_name']

research_data = ResearchData(
subject_id=subject.id,
questionnaire_response_id=questionnaire_response.id,
instrument=instrument,
research_study_id=research_study_id,
authored=FHIR_datetime.parse(document['authored']),
data=document
)
db.session.add(research_data)
db.session.commit()
# Asynchronous requests, look out for threads updating the same subject, QNR
key = f"add_research_data.{questionnaire_response.subject_id}:{questionnaire_response.id}"
with TimeoutLock(key, expires=60, timeout=60):
instrument = questionnaire_response.document['questionnaire']['reference'].split('/')[-1]
if research_study_id is None:
research_study_id = research_study_id_from_questionnaire(instrument)

patient_fields = ("careProvider", "identifier")
document = questionnaire_response.document_answered.copy()
subject = questionnaire_response.subject
document['encounter'] = questionnaire_response.encounter.as_fhir()
document["subject"] = {
k: v for k, v in subject.as_fhir().items() if k in patient_fields
}

if subject.organizations:
providers = []
for org in subject.organizations:
org_ref = Reference.organization(org.id).as_fhir()
identifiers = [
i.as_fhir() for i in org.identifiers if i.system == "http://pcctc.org/"]
if identifiers:
org_ref['identifier'] = identifiers
providers.append(org_ref)
document["subject"]["careProvider"] = providers

qb_status = qb_status_visit_name(
subject.id,
research_study_id,
FHIR_datetime.parse(questionnaire_response.document['authored']))
document["timepoint"] = qb_status['visit_name']

research_data = ResearchData(
subject_id=subject.id,
questionnaire_response_id=questionnaire_response.id,
instrument=instrument,
research_study_id=research_study_id,
authored=FHIR_datetime.parse(document['authored']),
data=document
)
db.session.add(research_data)
db.session.commit()

0 comments on commit 8cd97f0

Please sign in to comment.