diff --git a/src/apps/answers/service.py b/src/apps/answers/service.py index 5b708905056..417ae605e73 100644 --- a/src/apps/answers/service.py +++ b/src/apps/answers/service.py @@ -1530,7 +1530,22 @@ async def _prepare_responses(self, answers_map: dict[uuid.UUID, AnswerSchema]) - responses.append(dict(activityId=activity_id, answer=answer_item.answer)) return responses, [ai.user_public_key for ai in answer_items] - async def decrypt_data_for_loris(self, applet_id: uuid.UUID, respondent_id: uuid.UUID) -> dict | None: + async def _prepare_loris_responses( + self, answers_map: dict[uuid.UUID, AnswerSchema] + ) -> tuple[list[dict], list[str]]: + answer_items = await AnswerItemsCRUD(self.answers_session).get_respondent_submits_by_answer_ids( + list(answers_map.keys()) + ) + + responses = list() + for answer_item in answer_items: + answer = answers_map[answer_item.answer_id] + activity_id_version = str(answer.activity_history_id).replace("_", "__") + activity_answer_id = f"{activity_id_version}__{answer_item.answer_id}" + responses.append(dict(activityId=activity_answer_id, answer=answer_item.answer)) + return responses, [ai.user_public_key for ai in answer_items] + + async def decrypt_data_for_loris(self, applet_id: uuid.UUID, respondent_id: uuid.UUID) -> tuple[dict, list] | None: answers = await AnswersCRUD(self.answers_session).get_by_applet_id_and_readiness_to_share_data( applet_id=applet_id, respondent_id=respondent_id ) @@ -1538,9 +1553,10 @@ async def decrypt_data_for_loris(self, applet_id: uuid.UUID, respondent_id: uuid return None answer_map = dict((answer.id, answer) for answer in answers) + answer_versions = [a.version for a in answers] applet = await AppletsCRUD(self.session).get_by_id(applet_id) - responses, user_public_keys = await self._prepare_responses(answer_map) + responses, user_public_keys = await self._prepare_loris_responses(answer_map) data = dict( responses=responses, @@ -1568,7 +1584,7 @@ async def decrypt_data_for_loris(self, applet_id: uuid.UUID, respondent_id: uuid logger.info(f"Successful request (for LORIS) in {duration:.1f}" " seconds.") response_data = await resp.json() # return ReportServerResponse(**response_data) - return response_data + return response_data, answer_versions else: logger.error(f"Failed request (for LORIS) in {duration:.1f}" " seconds.") error_message = await resp.text() diff --git a/src/apps/integrations/loris/domain.py b/src/apps/integrations/loris/domain.py index d4ca85c49bd..b1afc635cec 100644 --- a/src/apps/integrations/loris/domain.py +++ b/src/apps/integrations/loris/domain.py @@ -6,6 +6,7 @@ from apps.activities.domain.conditional_logic import ConditionalLogic __all__ = [ + "UnencryptedAppletVersion", "UnencryptedApplet", "Consent", "ConsentUpdate", @@ -29,7 +30,7 @@ class Item(BaseModel): class Activitie(BaseModel): - id: uuid.UUID + id: str name: str description: str splash_screen: str = "" @@ -46,6 +47,11 @@ class UnencryptedApplet(BaseModel): activities: list[Activitie] +class UnencryptedAppletVersion(BaseModel): + version: str + applet: UnencryptedApplet + + class LorisServerResponse(BaseModel): pass diff --git a/src/apps/integrations/loris/service/loris.py b/src/apps/integrations/loris/service/loris.py index f4a6315337d..345f86ce297 100644 --- a/src/apps/integrations/loris/service/loris.py +++ b/src/apps/integrations/loris/service/loris.py @@ -1,19 +1,21 @@ import datetime +import itertools import json import time import uuid import aiohttp +from pydantic.json import pydantic_encoder -from apps.activities.crud.activity import ActivitiesCRUD -from apps.activities.crud.activity_item import ActivityItemsCRUD +from apps.activities.crud.activity_history import ActivityHistoriesCRUD +from apps.activities.crud.activity_item_history import ActivityItemHistoriesCRUD from apps.answers.crud.answers import AnswersCRUD from apps.answers.errors import ReportServerError from apps.answers.service import ReportServerService -from apps.applets.crud.applets import AppletsCRUD +from apps.applets.crud.applets_history import AppletHistoriesCRUD from apps.integrations.loris.crud.user_relationship import MlLorisUserRelationshipCRUD from apps.integrations.loris.db.schemas import MlLorisUserRelationshipSchema -from apps.integrations.loris.domain import MlLorisUserRelationship, UnencryptedApplet +from apps.integrations.loris.domain import MlLorisUserRelationship, UnencryptedAppletVersion from apps.integrations.loris.errors import LorisServerError, MlLorisUserRelationshipNotFoundError from apps.users.domain import User from config import settings @@ -45,21 +47,24 @@ async def integration(self): ) if not respondents: logger.info( - f"Do not found any respondents for given applet: \ - {str(self.applet_id)}. Finish." + f"No respondents found for given applet: \ + {str(self.applet_id)}. End of the synchronization." ) return users_answers: dict = {} + answer_versions: list = [] for respondent in set(respondents): try: report_service = ReportServerService(self.session) - decrypted_answers: dict[str, list] | None = await report_service.decrypt_data_for_loris( + decrypted_answers_and_versions: tuple[dict, list] | None = await report_service.decrypt_data_for_loris( self.applet_id, respondent ) - if not decrypted_answers: + if not decrypted_answers_and_versions: logger.info("Error during request to report server, no answers") return + decrypted_answers: dict[str, list] = decrypted_answers_and_versions[0] + answer_versions = decrypted_answers_and_versions[1] _result_dict = {} for item in decrypted_answers["result"]: activity_id = item["activityId"] @@ -76,28 +81,56 @@ async def integration(self): logger.info(f"Error during request to report server: {e}") return - applet_crud = AppletsCRUD(self.session) - applet = await applet_crud.get_by_id(self.applet_id) - loris_data = { - "id": self.applet_id, - "displayName": applet.display_name, - "description": list(applet.description.values())[0], - "activities": None, - } - - answers_for_loris_by_respondent: dict - activities: list - activities, answers_for_loris_by_respondent = await self._prepare_activities_and_answers(users_answers) - loris_data["activities"] = activities - activities_ids: list = [str(activitie["id"]) for activitie in activities] - token: str = await self._login_to_loris() headers = { "Authorization": f"Bearer: {token}", "Content-Type": "application/json", "accept": "*/*", } - await self._upload_applet_schema_to_loris(loris_data, headers) + + # check loris for already existing versions of the applet + existing_versions = await self._get_existing_versions_from_loris(self.applet_id, headers) + answer_versions = list(set(answer_versions)) + + missing_applet_versions = list(set(answer_versions) - set(existing_versions)) + # get missing versions of the applet + applet_history_crud = AppletHistoriesCRUD(self.session) + + loris_data = [] + + for version in missing_applet_versions: + applet_version = await applet_history_crud.retrieve_by_applet_version(f"{str(self.applet_id)}_{version}") + loris_data.append( + { + "version": version, + "applet": { + "id": self.applet_id, + "displayName": applet_version.display_name, + "description": list(applet_version.description.values())[0], + "activities": None, + }, + } + ) + + answers_for_loris_by_respondent: dict + + activities_by_versions = await self._prepare_activities(answer_versions) + for loris_datum in loris_data: + loris_datum["applet"]["activities"] = activities_by_versions[loris_datum["version"]] + + activities_map = { + activity["id"]: activity + for activity in list(itertools.chain.from_iterable(list(activities_by_versions.values()))) + } + + answers_for_loris_by_respondent = await self._prepare_answers(users_answers, activities_map) + + activities_ids: list = list(activities_map.keys()) + if loris_data: + await self._upload_applet_schema_to_loris(loris_data, headers) + + # check loris for already existing answers of the applet and filter them out + existing_answers = await self._get_existing_answers_from_loris(self.applet_id, headers) for user, answer in answers_for_loris_by_respondent.items(): candidate_id: str @@ -108,68 +141,86 @@ async def integration(self): except MlLorisUserRelationshipNotFoundError as e: logger.info(f"{e}. Need to create new candidate") candidate_id = await self._create_candidate_and_visit(headers, relationship_crud, uuid.UUID(user)) - - await self._add_instrument_to_loris(headers, candidate_id, activities_ids) - await self._add_instrument_data_to_loris(headers, candidate_id, answer, activities_ids) + filtered_answers = { + key: value for key, value in answer.items() if key.split("__")[2] not in existing_answers + } + if filtered_answers: + await self._add_instrument_to_loris(headers, candidate_id, activities_ids) + await self._add_instrument_data_to_loris(headers, candidate_id, filtered_answers, activities_ids) logger.info(f"Successfully send data for user: {user}," f" with loris id: {candidate_id}") logger.info("All finished") - async def _prepare_activities_and_answers(self, users_answers: dict) -> tuple[list, dict]: - activities_crud = ActivitiesCRUD(self.session) - activities_items_crud = ActivityItemsCRUD(self.session) - answers_for_loris_by_respondent: dict = {} - activities: list = [] - applet_activities = await activities_crud.get_by_applet_id(self.applet_id) - for _activitie in applet_activities: - items: list = [] - _activities_items = await activities_items_crud.get_by_activity_id(_activitie.id) - for item in _activities_items: - items.append( + async def _prepare_activities(self, versions: list) -> dict: + activity_history_crud = ActivityHistoriesCRUD(self.session) + activities_items_history_crud = ActivityItemHistoriesCRUD(self.session) + activities_by_versions: dict = {} + for version in versions: + applet_activities = await activity_history_crud.get_by_applet_id_version(f"{str(self.applet_id)}_{version}") + activities: list = [] + for _activitie in applet_activities: + items: list = [] + _activities_items = await activities_items_history_crud.get_by_activity_id_version( + _activitie.id_version + ) + for item in _activities_items: + items.append( + { + "id": item.id, + "question": list(item.question.values())[0], + "responseType": item.response_type, + "responseValues": item.response_values, + "config": item.config, + "name": item.name, + "isHidden": item.is_hidden, + "conditionalLogic": item.conditional_logic, + "allowEdit": item.allow_edit, + } + ) + activities.append( { - "id": item.id, - "question": list(item.question.values())[0], - "responseType": item.response_type, - "responseValues": item.response_values, - "config": item.config, - "name": item.name, - "isHidden": item.is_hidden, - "conditionalLogic": item.conditional_logic, - "allowEdit": item.allow_edit, + "id": str(_activitie.id_version).replace("_", "__"), + "name": _activitie.name, + "description": list(_activitie.description.values())[0], + "splash_screen": _activitie.splash_screen, + "image": _activitie.image, + "order": _activitie.order, + "createdAt": _activitie.created_at, + "items": items, } ) - activities.append( - { - "id": _activitie.id, - "name": _activitie.name, - "description": list(_activitie.description.values())[0], - "splash_screen": _activitie.splash_screen, - "image": _activitie.image, - "order": _activitie.order, - "createdAt": _activitie.created_at, - "items": items, - } - ) - for user, answers in users_answers.items(): - if str(_activitie.id) in answers: + activities_by_versions[version] = activities + + return activities_by_versions + + async def _prepare_answers(self, users_answers: dict, activities: dict): + answers_for_loris_by_respondent: dict = {} + for user, answers in users_answers.items(): + for id, answer in answers.items(): + activity_id, version, answer_id = id.split("__") + activity_version_id = f"{activity_id}__{version}" + if activity_version_id in activities: answers_for_loris = await self._ml_answer_to_loris( - str(_activitie.id), - items, - answers[str(_activitie.id)], + answer_id, + activity_id, + version, + activities[activity_version_id]["items"], + answer, ) if user not in answers_for_loris_by_respondent: answers_for_loris_by_respondent[user] = answers_for_loris else: answers_for_loris_by_respondent[user].update(answers_for_loris) + return answers_for_loris_by_respondent - return activities, answers_for_loris_by_respondent - - async def _ml_answer_to_loris(self, activitie_id: str, items: list, data: list) -> dict: + async def _ml_answer_to_loris( + self, answer_id: str, activity_id: str, version: str, items: list, data: list + ) -> dict: loris_answers: dict = {} for i in range(len(items)): - key: str = "__".join([activitie_id, items[i]["name"]]) + key: str = "__".join([activity_id, version, answer_id, items[i]["name"]]) match items[i]["responseType"]: case "singleSelect": index = data[i]["value"] @@ -267,14 +318,54 @@ async def _login_to_loris(self) -> str: error_message = await resp.text() raise LorisServerError(message=error_message) - async def _upload_applet_schema_to_loris(self, schema: dict, headers: dict): + async def _get_existing_versions_from_loris(self, applet_id: str, headers: dict) -> str: + timeout = aiohttp.ClientTimeout(total=60) + async with aiohttp.ClientSession(timeout=timeout) as session: + url = settings.loris.ml_schema_existing_versions_url.format(self.applet_id) + logger.info(f"Sending EXISTING SCHEMA VERSIONS request to the loris server {url}") + start = time.time() + async with session.get( + url, + headers=headers, + ) as resp: + duration = time.time() - start + if resp.status == 200: + logger.info(f"Successful request in {duration:.1f} seconds.") + response_data = await resp.json() + return response_data + else: + logger.error(f"Failed request in {duration:.1f} seconds.") + error_message = await resp.text() + raise LorisServerError(message=error_message) + + async def _get_existing_answers_from_loris(self, applet_id: str, headers: dict) -> str: + timeout = aiohttp.ClientTimeout(total=60) + async with aiohttp.ClientSession(timeout=timeout) as session: + url = settings.loris.ml_schema_existing_answers_url.format(self.applet_id) + logger.info(f"Sending EXISTING ANSWERS request to the loris server {url}") + start = time.time() + async with session.get( + url, + headers=headers, + ) as resp: + duration = time.time() - start + if resp.status == 200: + logger.info(f"Successful request in {duration:.1f} seconds.") + response_data = await resp.json() + return response_data + else: + logger.error(f"Failed request in {duration:.1f} seconds.") + error_message = await resp.text() + raise LorisServerError(message=error_message) + + async def _upload_applet_schema_to_loris(self, schemas: list, headers: dict): timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession(timeout=timeout) as session: logger.info(f"Sending UPLOAD SCHEMA request to the loris server {settings.loris.ml_schema_url}") start = time.time() async with session.post( settings.loris.ml_schema_url, - data=UnencryptedApplet(**schema).json(), + data=json.dumps([UnencryptedAppletVersion(**schema) for schema in schemas], default=pydantic_encoder), headers=headers, ) as resp: duration = time.time() - start @@ -410,32 +501,34 @@ async def _add_instrument_to_loris(self, headers: dict, candidate_id: str, activ async def _add_instrument_data_to_loris(self, headers: dict, candidate_id: str, answer: dict, activities_ids: list): timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession(timeout=timeout) as session: - for activitie_id in activities_ids: - logger.info( - f"Sending SEND INSTUMENT DATA request to the loris server " - f"{settings.loris.instrument_data_url.format(candidate_id, VISIT, activitie_id)}" - ) - start = time.time() - _data_instrument_data = { - "Meta": { - "Instrument": activitie_id, - "Visit": VISIT, - "Candidate": candidate_id, - "DDE": True, - }, - activitie_id: answer, - } - logger.info(f"Sending SEND INSTUMENT DATA is : {json.dumps(_data_instrument_data)} ") - async with session.put( - settings.loris.instrument_data_url.format(candidate_id, VISIT, activitie_id), - data=json.dumps(_data_instrument_data), - headers=headers, - ) as resp: - duration = time.time() - start - if resp.status == 204: - logger.info(f"Successful request in {duration:.1f} seconds.") - else: - logger.info(f"Failed request in {duration:.1f} seconds.") - error_message = await resp.text() - logger.info(f"response is: " f"{error_message}\nstatus is: {resp.status}") - raise LorisServerError(message=error_message) + for activity_id in activities_ids: + answer_by_activity_id = {key: value for key, value in answer.items() if activity_id in key} + if answer_by_activity_id: + logger.info( + f"Sending SEND INSTUMENT DATA request to the loris server " + f"{settings.loris.instrument_data_url.format(candidate_id, VISIT, activity_id)}" + ) + start = time.time() + _data_instrument_data = { + "Meta": { + "Instrument": activity_id, + "Visit": VISIT, + "Candidate": candidate_id, + "DDE": True, + }, + activity_id: answer_by_activity_id, + } + logger.info(f"Sending SEND INSTUMENT DATA is : {json.dumps(_data_instrument_data)} ") + async with session.put( + settings.loris.instrument_data_url.format(candidate_id, VISIT, activity_id), + data=json.dumps(_data_instrument_data), + headers=headers, + ) as resp: + duration = time.time() - start + if resp.status == 204: + logger.info(f"Successful request in {duration:.1f} seconds.") + else: + logger.info(f"Failed request in {duration:.1f} seconds.") + error_message = await resp.text() + logger.info(f"response is: " f"{error_message}\nstatus is: {resp.status}") + raise LorisServerError(message=error_message) diff --git a/src/apps/integrations/loris/tests/test_domain.py b/src/apps/integrations/loris/tests/test_domain.py index 63159e01b3f..81059d4a91a 100644 --- a/src/apps/integrations/loris/tests/test_domain.py +++ b/src/apps/integrations/loris/tests/test_domain.py @@ -43,7 +43,7 @@ def test_item_model(uuid_zero: uuid.UUID): def test_activitie_model(uuid_zero: uuid.UUID): - activitie_id = uuid_zero + activitie_id = str(uuid_zero) item_id = uuid_zero created_at = datetime.datetime.now() activitie_data = { @@ -82,7 +82,7 @@ def test_activitie_model(uuid_zero: uuid.UUID): def test_unencrypted_applet_model(uuid_zero: uuid.UUID): applet_id = uuid_zero - activitie_id = uuid_zero + activitie_id = str(uuid_zero) item_id = uuid_zero created_at = datetime.datetime.now() applet_data = { diff --git a/src/config/integrations.py b/src/config/integrations.py index 227d5147cd8..2b0eefb4abe 100644 --- a/src/config/integrations.py +++ b/src/config/integrations.py @@ -12,3 +12,5 @@ class LorisSettings(BaseSettings): start_visit_url = "https://loris.cmiml.net/api/v0.0.4-dev/candidates/{}/{}" add_instruments_url = "https://loris.cmiml.net/api/v0.0.4-dev/candidates/{}/{}/instruments" instrument_data_url = "https://loris.cmiml.net/api/v0.0.3/candidates/{}/{}/instruments/{}" + ml_schema_existing_versions_url = "https://loris.cmiml.net/mindlogger/v1/applet/{}/versions" + ml_schema_existing_answers_url = "https://loris.cmiml.net/mindlogger/v1/applet/{}/answers"