From b467647ae0bef10c91972e46d80dcca8fac6952b Mon Sep 17 00:00:00 2001 From: Sergei Veselev Date: Thu, 10 Oct 2024 12:54:38 -0400 Subject: [PATCH] CNDE-1805: RTR Observation: Add new columns and logic (#56) --- .../service/InvestigationService.java | 7 +- .../util/ProcessInvestigationDataUtil.java | 306 +++++++++--------- .../InvestigationDataProcessingTests.java | 12 +- .../service/InvestigationServiceTest.java | 8 +- .../ObservationNotificationIds.json | 14 + .../db.rdb_modern.changelog-16.1.yaml | 11 +- ...te_nrt_investigation_notification-002.sql} | 0 .../tables/017-create_nrt_observation-002.sql | 25 -- .../tables/017-create_nrt_observation-003.sql | 29 -- .../tables/017-create_nrt_observation-004.sql | 9 + .../repository/model/dto/Observation.java | 5 +- .../model/dto/ObservationTransformed.java | 45 ++- .../model/reporting/ObservationReporting.java | 15 +- .../service/ObservationService.java | 17 +- .../util/ProcessObservationDataUtil.java | 297 +++++++++++------ .../ObservationDataProcessTests.java | 110 +++++-- .../service/ObservationServiceTest.java | 87 +++-- .../test/resources/rawDataFiles/ActIds.json | 20 ++ .../OrganizationParticipations.json | 33 ++ .../rawDataFiles/ParentObservations.json | 4 +- .../rawDataFiles/PersonParticipations.json | 172 +++++++++- .../PersonParticipationsMorb.json | 71 ++++ 22 files changed, 865 insertions(+), 432 deletions(-) rename liquibase-service/src/main/resources/db/rdb_modern/tables/{012-create-nrt-investigation-notification-002.sql => 012-create_nrt_investigation_notification-002.sql} (100%) create mode 100644 liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-004.sql create mode 100644 observation-service/src/test/resources/rawDataFiles/ActIds.json create mode 100644 observation-service/src/test/resources/rawDataFiles/PersonParticipationsMorb.json diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java index 20814196..a7d6d4d4 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java @@ -1,7 +1,5 @@ package gov.cdc.etldatapipeline.investigation.service; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import gov.cdc.etldatapipeline.commonutil.NoDataException; import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl; import gov.cdc.etldatapipeline.investigation.repository.model.dto.NotificationUpdate; @@ -49,7 +47,6 @@ public class InvestigationService { private static final Logger logger = LoggerFactory.getLogger(InvestigationService.class); private final ExecutorService phcExecutor = Executors.newFixedThreadPool(nProc*2, new CustomizableThreadFactory("phc-")); - private static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); @Value("${spring.kafka.input.topic-name-phc}") private String investigationTopic; @@ -128,7 +125,7 @@ public void processInvestigation(String value) { // only process and send notifications when investigation data has been sent .whenComplete((res, ex) -> logger.info("Investigation data (uid={}) sent to {}", phcUid, investigationTopicReporting)) - .thenRunAsync(() -> processDataUtil.processNotifications(investigation.getInvestigationNotifications(), objectMapper)) + .thenRunAsync(() -> processDataUtil.processNotifications(investigation.getInvestigationNotifications())) .join(); } else { throw new EntityNotFoundException("Unable to find Investigation with id: " + publicHealthCaseUid); @@ -151,7 +148,7 @@ public void processNotification(String value) { Optional notificationData = notificationRepository.computeNotifications(notificationUid); if (notificationData.isPresent()) { NotificationUpdate notification = notificationData.get(); - processDataUtil.processNotifications(notification.getInvestigationNotifications(), objectMapper); + processDataUtil.processNotifications(notification.getInvestigationNotifications()); } else { throw new EntityNotFoundException("Unable to find Notification with id; " + notificationUid ); } diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java index 86dc0529..8cd2c3c9 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/util/ProcessInvestigationDataUtil.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl; import gov.cdc.etldatapipeline.investigation.repository.model.dto.*; import gov.cdc.etldatapipeline.investigation.repository.model.reporting.InvestigationKey; @@ -28,6 +29,7 @@ @Setter public class ProcessInvestigationDataUtil { private static final Logger logger = LoggerFactory.getLogger(ProcessInvestigationDataUtil.class); + private static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); @Value("${spring.kafka.output.topic-name-confirmation}") public String investigationConfirmationOutputTopicName; @@ -45,235 +47,221 @@ public class ProcessInvestigationDataUtil { private final InvestigationCaseAnswerRepository investigationCaseAnswerRepository; private final InvestigationRepository investigationRepository; + public static final String TYPE_CD = "type_cd"; + @Transactional(transactionManager = "rdbTransactionManager") public InvestigationTransformed transformInvestigationData(Investigation investigation) { InvestigationTransformed investigationTransformed = new InvestigationTransformed(); - ObjectMapper objectMapper = new ObjectMapper(); - transformPersonParticipations(investigation.getPersonParticipations(), investigationTransformed, objectMapper); - transformOrganizationParticipations(investigation.getOrganizationParticipations(), investigationTransformed, objectMapper); - transformActIds(investigation.getActIds(), investigationTransformed, objectMapper); - transformObservationIds(investigation.getObservationNotificationIds(), investigationTransformed, objectMapper); - transformInvestigationConfirmationMethod(investigation.getInvestigationConfirmationMethod(), objectMapper); - processInvestigationPageCaseAnswer(investigation.getInvestigationCaseAnswer(), investigationTransformed, objectMapper); + transformPersonParticipations(investigation.getPersonParticipations(), investigationTransformed); + transformOrganizationParticipations(investigation.getOrganizationParticipations(), investigationTransformed); + transformActIds(investigation.getActIds(), investigationTransformed); + transformObservationIds(investigation.getObservationNotificationIds(), investigationTransformed); + transformInvestigationConfirmationMethod(investigation.getInvestigationConfirmationMethod()); + processInvestigationPageCaseAnswer(investigation.getInvestigationCaseAnswer(), investigationTransformed); return investigationTransformed; } - public void processNotifications(String investigationNotifications, ObjectMapper objectMapper) { + public void processNotifications(String investigationNotifications) { try { - JsonNode investigationNotificationsJsonArray = parseJsonArray(investigationNotifications, objectMapper); + JsonNode investigationNotificationsJsonArray = parseJsonArray(investigationNotifications); - if (investigationNotificationsJsonArray != null) { - InvestigationNotificationKey investigationNotificationKey = new InvestigationNotificationKey(); - for (JsonNode node : investigationNotificationsJsonArray) { - Long notificationUid = node.get("notification_uid").asLong(); - investigationNotificationKey.setNotificationUid(notificationUid); + InvestigationNotificationKey investigationNotificationKey = new InvestigationNotificationKey(); + for (JsonNode node : investigationNotificationsJsonArray) { + Long notificationUid = node.get("notification_uid").asLong(); + investigationNotificationKey.setNotificationUid(notificationUid); - InvestigationNotification tempInvestigationNotificationObject = objectMapper.treeToValue(node, InvestigationNotification.class); + InvestigationNotification tempInvestigationNotificationObject = objectMapper.treeToValue(node, InvestigationNotification.class); - String jsonKey = jsonGenerator.generateStringJson(investigationNotificationKey); - String jsonValue = jsonGenerator.generateStringJson(tempInvestigationNotificationObject); - kafkaTemplate.send(investigationNotificationsOutputTopicName, jsonKey, jsonValue) - .whenComplete((res, e) -> logger.info("Notification data (uid={}) sent to {}", notificationUid, investigationNotificationsOutputTopicName)); - } - } - else { - logger.info("InvestigationNotification array is null."); + String jsonKey = jsonGenerator.generateStringJson(investigationNotificationKey); + String jsonValue = jsonGenerator.generateStringJson(tempInvestigationNotificationObject); + kafkaTemplate.send(investigationNotificationsOutputTopicName, jsonKey, jsonValue) + .whenComplete((res, e) -> logger.info("Notification data (uid={}) sent to {}", notificationUid, investigationNotificationsOutputTopicName)); } + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "InvestigationNotification"); } catch (Exception e) { logger.error("Error processing Notifications JSON array from investigation data: {}", e.getMessage()); } } - private void transformPersonParticipations(String personParticipations, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) { + private void transformPersonParticipations(String personParticipations, InvestigationTransformed investigationTransformed) { try { - JsonNode personParticipationsJsonArray = parseJsonArray(personParticipations, objectMapper); - - if (personParticipationsJsonArray != null) { - for (JsonNode node : personParticipationsJsonArray) { - String typeCode = node.get("type_cd").asText(); - String subjectClassCode = node.get("subject_class_cd").asText(); - String personCode = node.get("person_cd").asText(); - Long entityId = node.get("entity_id").asLong(); - - if (typeCode.equals("InvestgrOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PRV")) { - investigationTransformed.setInvestigatorId(entityId); - } - if (typeCode.equals("PhysicianOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PRV")) { - investigationTransformed.setPhysicianId(entityId); - } - if (typeCode.equals("SubjOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PAT")) { - investigationTransformed.setPatientId(entityId); - } + JsonNode personParticipationsJsonArray = parseJsonArray(personParticipations); + + for (JsonNode node : personParticipationsJsonArray) { + String typeCode = node.get(TYPE_CD).asText(); + String subjectClassCode = node.get("subject_class_cd").asText(); + String personCode = node.get("person_cd").asText(); + Long entityId = node.get("entity_id").asLong(); + + if (typeCode.equals("InvestgrOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PRV")) { + investigationTransformed.setInvestigatorId(entityId); + } + if (typeCode.equals("PhysicianOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PRV")) { + investigationTransformed.setPhysicianId(entityId); + } + if (typeCode.equals("SubjOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PAT")) { + investigationTransformed.setPatientId(entityId); } } - else { - logger.info("PersonParticipations array is null."); - } + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "PersonParticipations"); } catch (Exception e) { logger.error("Error processing Person Participation JSON array from investigation data: {}", e.getMessage()); } } - private void transformOrganizationParticipations(String organizationParticipations, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) { + private void transformOrganizationParticipations(String organizationParticipations, InvestigationTransformed investigationTransformed) { try { - JsonNode organizationParticipationsJsonArray = parseJsonArray(organizationParticipations, objectMapper); + JsonNode organizationParticipationsJsonArray = parseJsonArray(organizationParticipations); - if(organizationParticipationsJsonArray != null) { - for(JsonNode node : organizationParticipationsJsonArray) { - String typeCode = node.get("type_cd").asText(); - String subjectClassCode = node.get("subject_class_cd").asText(); + for (JsonNode node : organizationParticipationsJsonArray) { + String typeCode = node.get(TYPE_CD).asText(); + String subjectClassCode = node.get("subject_class_cd").asText(); - if(typeCode.equals("OrgAsReporterOfPHC") && subjectClassCode.equals("ORG")) { - investigationTransformed.setOrganizationId(node.get("entity_id").asLong()); - } + if (typeCode.equals("OrgAsReporterOfPHC") && subjectClassCode.equals("ORG")) { + investigationTransformed.setOrganizationId(node.get("entity_id").asLong()); } } - else { - logger.info("OrganizationParticipations array is null."); - } + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "OrganizationParticipations"); } catch (Exception e) { logger.error("Error processing Organization Participation JSON array from investigation data: {}", e.getMessage()); } } - private void transformActIds(String actIds, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) { + private void transformActIds(String actIds, InvestigationTransformed investigationTransformed) { try { - JsonNode actIdsJsonArray = parseJsonArray(actIds, objectMapper); - - if(actIdsJsonArray != null) { - for(JsonNode node : actIdsJsonArray) { - int actIdSeq = node.get("act_id_seq").asInt(); - String typeCode = node.get("type_cd").asText(); - String rootExtension = node.get("root_extension_txt").asText(); - - if(typeCode.equals("STATE") && actIdSeq == 1) { - investigationTransformed.setInvStateCaseId(rootExtension); - } - if(typeCode.equals("CITY") && actIdSeq == 2) { - investigationTransformed.setCityCountyCaseNbr(rootExtension); - } - if(typeCode.equals("LEGACY") && actIdSeq == 3) { - investigationTransformed.setLegacyCaseId(rootExtension); - } + JsonNode actIdsJsonArray = parseJsonArray(actIds); + + for(JsonNode node : actIdsJsonArray) { + int actIdSeq = node.get("act_id_seq").asInt(); + String typeCode = node.get(TYPE_CD).asText(); + String rootExtension = node.get("root_extension_txt").asText(); + + if(typeCode.equals("STATE") && actIdSeq == 1) { + investigationTransformed.setInvStateCaseId(rootExtension); + } + if(typeCode.equals("CITY") && actIdSeq == 2) { + investigationTransformed.setCityCountyCaseNbr(rootExtension); + } + if(typeCode.equals("LEGACY") && actIdSeq == 3) { + investigationTransformed.setLegacyCaseId(rootExtension); } } - else { - logger.info("ActIds array is null."); - } + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "ActIds"); } catch (Exception e) { logger.error("Error processing Act Ids JSON array from investigation data: {}", e.getMessage()); } } - private void transformObservationIds(String observationNotificationIds, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) { + private void transformObservationIds(String observationNotificationIds, InvestigationTransformed investigationTransformed) { try { - JsonNode investigationObservationIdsJsonArray = parseJsonArray(observationNotificationIds, objectMapper); + JsonNode investigationObservationIdsJsonArray = parseJsonArray(observationNotificationIds); InvestigationObservation investigationObservation = new InvestigationObservation(); List observationIds = new ArrayList<>(); - if(investigationObservationIdsJsonArray != null) { - for(JsonNode node : investigationObservationIdsJsonArray) { - String sourceClassCode = node.get("source_class_cd").asText(); - String actTypeCode = node.get("act_type_cd").asText(); - Long publicHealthCaseUid = node.get("public_health_case_uid").asLong(); - investigationKey.setPublicHealthCaseUid(publicHealthCaseUid); - - if(sourceClassCode.equals("OBS") && actTypeCode.equals("PHCInvForm")) { - investigationTransformed.setPhcInvFormId(node.get("source_act_uid").asLong()); - } - - if(sourceClassCode.equals("OBS") && actTypeCode.equals("LabReport")) { - investigationObservation.setPublicHealthCaseUid(publicHealthCaseUid); - observationIds.add(node.get("source_act_uid").asLong()); - } + for(JsonNode node : investigationObservationIdsJsonArray) { + String sourceClassCode = node.path("source_class_cd").asText(); + String actTypeCode = node.path("act_type_cd").asText(); + Long sourceActId = node.get("source_act_uid").asLong(); + Long publicHealthCaseUid = node.get("public_health_case_uid").asLong(); + investigationKey.setPublicHealthCaseUid(publicHealthCaseUid); + + if(sourceClassCode.equals("OBS") && actTypeCode.equals("PHCInvForm")) { + investigationTransformed.setPhcInvFormId(sourceActId); + } + + if(sourceClassCode.equals("OBS") && actTypeCode.equals("LabReport")) { + investigationObservation.setPublicHealthCaseUid(publicHealthCaseUid); + observationIds.add(sourceActId); } - for(Long id : observationIds) { - investigationObservation.setObservationId(id); - String jsonValue = jsonGenerator.generateStringJson(investigationObservation); - kafkaTemplate.send(investigationObservationOutputTopicName, jsonValue, jsonValue); + if(sourceClassCode.equals("OBS") && actTypeCode.equals("MorbReport")) { + investigationObservation.setPublicHealthCaseUid(publicHealthCaseUid); + observationIds.add(sourceActId); } } - else { - logger.info("InvestigationObservationIds array is null."); + + for(Long id : observationIds) { + investigationObservation.setObservationId(id); + String jsonValue = jsonGenerator.generateStringJson(investigationObservation); + kafkaTemplate.send(investigationObservationOutputTopicName, jsonValue, jsonValue); } + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "InvestigationObservationIds"); } catch (Exception e) { logger.error("Error processing Observation Ids JSON array from investigation data: {}", e.getMessage()); } } - private void transformInvestigationConfirmationMethod(String investigationConfirmationMethod, ObjectMapper objectMapper) { + private void transformInvestigationConfirmationMethod(String investigationConfirmationMethod) { try { - JsonNode investigationConfirmationMethodJsonArray = parseJsonArray(investigationConfirmationMethod, objectMapper); - - if(investigationConfirmationMethodJsonArray != null) { - InvestigationConfirmationMethodKey investigationConfirmationMethodKey = new InvestigationConfirmationMethodKey(); - InvestigationConfirmationMethod investigationConfirmation = new InvestigationConfirmationMethod(); - Map confirmationMethodMap = new HashMap<>(); - String confirmationMethodTime = null; - - // Redundant time variable in case if confirmation_method_time is null in all rows of the array - String phcLastChgTime = investigationConfirmationMethodJsonArray.get(0).get("phc_last_chg_time").asText(); - Long publicHealthCaseUid = investigationConfirmationMethodJsonArray.get(0).get("public_health_case_uid").asLong(); - - for(JsonNode node : investigationConfirmationMethodJsonArray) { - JsonNode timeNode = node.get("confirmation_method_time"); - if (timeNode != null && !timeNode.isNull()) { - confirmationMethodTime = timeNode.asText(); - } - confirmationMethodMap.put(node.get("confirmation_method_cd").asText(), node.get("confirmation_method_desc_txt").asText()); - } - investigationConfirmation.setPublicHealthCaseUid(publicHealthCaseUid); - investigationConfirmationMethodKey.setPublicHealthCaseUid(publicHealthCaseUid); - - investigationConfirmation.setConfirmationMethodTime( - confirmationMethodTime == null ? phcLastChgTime : confirmationMethodTime); - - for(Map.Entry entry : confirmationMethodMap.entrySet()) { - investigationConfirmation.setConfirmationMethodCd(entry.getKey()); - investigationConfirmation.setConfirmationMethodDescTxt(entry.getValue()); - investigationConfirmationMethodKey.setConfirmationMethodCd(entry.getKey()); - String jsonKey = jsonGenerator.generateStringJson(investigationConfirmationMethodKey); - String jsonValue = jsonGenerator.generateStringJson(investigationConfirmation); - kafkaTemplate.send(investigationConfirmationOutputTopicName, jsonKey, jsonValue); + JsonNode investigationConfirmationMethodJsonArray = parseJsonArray(investigationConfirmationMethod); + + InvestigationConfirmationMethodKey investigationConfirmationMethodKey = new InvestigationConfirmationMethodKey(); + InvestigationConfirmationMethod investigationConfirmation = new InvestigationConfirmationMethod(); + Map confirmationMethodMap = new HashMap<>(); + String confirmationMethodTime = null; + + // Redundant time variable in case if confirmation_method_time is null in all rows of the array + String phcLastChgTime = investigationConfirmationMethodJsonArray.get(0).get("phc_last_chg_time").asText(); + Long publicHealthCaseUid = investigationConfirmationMethodJsonArray.get(0).get("public_health_case_uid").asLong(); + + for(JsonNode node : investigationConfirmationMethodJsonArray) { + JsonNode timeNode = node.get("confirmation_method_time"); + if (timeNode != null && !timeNode.isNull()) { + confirmationMethodTime = timeNode.asText(); } + confirmationMethodMap.put(node.get("confirmation_method_cd").asText(), node.get("confirmation_method_desc_txt").asText()); } - else { - logger.info("InvestigationConfirmationMethod array is null."); + investigationConfirmation.setPublicHealthCaseUid(publicHealthCaseUid); + investigationConfirmationMethodKey.setPublicHealthCaseUid(publicHealthCaseUid); + + investigationConfirmation.setConfirmationMethodTime( + confirmationMethodTime == null ? phcLastChgTime : confirmationMethodTime); + + for(Map.Entry entry : confirmationMethodMap.entrySet()) { + investigationConfirmation.setConfirmationMethodCd(entry.getKey()); + investigationConfirmation.setConfirmationMethodDescTxt(entry.getValue()); + investigationConfirmationMethodKey.setConfirmationMethodCd(entry.getKey()); + String jsonKey = jsonGenerator.generateStringJson(investigationConfirmationMethodKey); + String jsonValue = jsonGenerator.generateStringJson(investigationConfirmation); + kafkaTemplate.send(investigationConfirmationOutputTopicName, jsonKey, jsonValue); } + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "InvestigationObservationIds"); } catch (Exception e) { logger.error("Error processing investigation confirmation method JSON array from investigation data: {}", e.getMessage()); } } - private void processInvestigationPageCaseAnswer(String investigationCaseAnswer, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) { + private void processInvestigationPageCaseAnswer(String investigationCaseAnswer, InvestigationTransformed investigationTransformed) { try { - JsonNode investigationCaseAnswerJsonArray = parseJsonArray(investigationCaseAnswer, objectMapper); + JsonNode investigationCaseAnswerJsonArray = parseJsonArray(investigationCaseAnswer); - if(investigationCaseAnswerJsonArray != null) { - Long actUid = investigationCaseAnswerJsonArray.get(0).get("act_uid").asLong(); - List investigationCaseAnswerList = new ArrayList<>(); + Long actUid = investigationCaseAnswerJsonArray.get(0).get("act_uid").asLong(); + List investigationCaseAnswerList = new ArrayList<>(); - for(JsonNode node : investigationCaseAnswerJsonArray) { - InvestigationCaseAnswer tempCaseAnswerObject = objectMapper.treeToValue(node, InvestigationCaseAnswer.class); - investigationCaseAnswerList.add(tempCaseAnswerObject); - } + for(JsonNode node : investigationCaseAnswerJsonArray) { + InvestigationCaseAnswer tempCaseAnswerObject = objectMapper.treeToValue(node, InvestigationCaseAnswer.class); + investigationCaseAnswerList.add(tempCaseAnswerObject); + } - investigationCaseAnswerRepository.deleteByActUid(actUid); - investigationCaseAnswerRepository.saveAll(investigationCaseAnswerList); + investigationCaseAnswerRepository.deleteByActUid(actUid); + investigationCaseAnswerRepository.saveAll(investigationCaseAnswerList); - String rdbTblNms = String.join(",", investigationCaseAnswerList.stream() - .map(InvestigationCaseAnswer::getRdbTableNm).collect(Collectors.toSet())); - if (!rdbTblNms.isEmpty()) { - investigationTransformed.setRdbTableNameList(rdbTblNms); - } - } - else { - logger.info("InvestigationCaseAnswerJsonArray array is null."); + String rdbTblNms = String.join(",", investigationCaseAnswerList.stream() + .map(InvestigationCaseAnswer::getRdbTableNm).collect(Collectors.toSet())); + if (!rdbTblNms.isEmpty()) { + investigationTransformed.setRdbTableNameList(rdbTblNms); } + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "InvestigationCaseAnswer"); } catch (Exception e) { logger.error("Error processing investigation case answer JSON array from investigation data: {}", e.getMessage()); } @@ -291,12 +279,12 @@ public void processPhcFactDatamart(String publicHealthCaseUid) { } } - private JsonNode parseJsonArray(String jsonString, ObjectMapper objectMapper) throws JsonProcessingException { + private JsonNode parseJsonArray(String jsonString) throws JsonProcessingException, IllegalArgumentException { JsonNode jsonArray = jsonString != null ? objectMapper.readTree(jsonString) : null; - if (jsonArray != null) { - return jsonArray.isArray() ? jsonArray : null; + if (jsonArray != null && jsonArray.isArray()) { + return jsonArray; } else { - return null; + throw new IllegalArgumentException("{} array is null."); } } } diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java index 15e593a6..2085a2db 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/InvestigationDataProcessingTests.java @@ -122,7 +122,7 @@ void testTransformInvestigationError(){ investigation.setInvestigationCaseAnswer(invalidJSON); transformer.transformInvestigationData(investigation); - transformer.processNotifications(invalidJSON, objectMapper); + transformer.processNotifications(invalidJSON); List logs = listAppender.list; logs.forEach(le -> assertTrue(le.getFormattedMessage().contains(invalidJSON))); @@ -138,10 +138,10 @@ void testObservationNotificationIds() throws JsonProcessingException { InvestigationObservation observation = new InvestigationObservation(); observation.setPublicHealthCaseUid(investigationUid); - observation.setObservationId(263748596L); + observation.setObservationId(263748599L); transformer.transformInvestigationData(investigation); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); assertEquals(OBSERVATION_TOPIC, topicCaptor.getValue()); var actualObservation = objectMapper.readValue( @@ -165,7 +165,7 @@ void testProcessNotifications() throws JsonProcessingException { when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - transformer.processNotifications(investigation.getInvestigationNotifications(), new ObjectMapper()); + transformer.processNotifications(investigation.getInvestigationNotifications()); verify(kafkaTemplate, times (1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); assertEquals(NOTIFICATIONS_TOPIC, topicCaptor.getValue()); @@ -185,8 +185,8 @@ void testProcessMissingOrInvalidNotifications() { investigation.setPublicHealthCaseUid(investigationUid); investigation.setInvestigationNotifications(null); transformer.investigationNotificationsOutputTopicName = NOTIFICATIONS_TOPIC; - transformer.processNotifications(null, new ObjectMapper()); - transformer.processNotifications("{\"foo\":\"bar\"}", new ObjectMapper()); + transformer.processNotifications(null); + transformer.processNotifications("{\"foo\":\"bar\"}"); verify(kafkaTemplate, never()).send(eq(NOTIFICATIONS_TOPIC), anyString(), anyString()); } diff --git a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java index ed670d5c..f58c4657 100644 --- a/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java +++ b/investigation-service/src/test/java/gov/cdc/etldatapipeline/investigation/service/InvestigationServiceTest.java @@ -153,11 +153,11 @@ private void validateInvestigationData(String payload, Investigation investigati investigationKey.setPublicHealthCaseUid(investigation.getPublicHealthCaseUid()); final InvestigationReporting reportingModel = constructInvestigationReporting(investigation.getPublicHealthCaseUid()); - verify(kafkaTemplate, times(5)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + verify(kafkaTemplate, times(6)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); - String actualTopic = topicCaptor.getAllValues().get(3); - String actualKey = keyCaptor.getAllValues().get(3); - String actualValue = messageCaptor.getAllValues().get(3); + String actualTopic = topicCaptor.getAllValues().get(4); + String actualKey = keyCaptor.getAllValues().get(4); + String actualValue = messageCaptor.getAllValues().get(4); var actualReporting = objectMapper.readValue( objectMapper.readTree(actualValue).path("payload").toString(), InvestigationReporting.class); diff --git a/investigation-service/src/test/resources/rawDataFiles/ObservationNotificationIds.json b/investigation-service/src/test/resources/rawDataFiles/ObservationNotificationIds.json index 7493e817..6dcdea3a 100644 --- a/investigation-service/src/test/resources/rawDataFiles/ObservationNotificationIds.json +++ b/investigation-service/src/test/resources/rawDataFiles/ObservationNotificationIds.json @@ -40,6 +40,20 @@ "act_last_chg_user_id": null, "last_chg_user_name": null, "act_last_chg_time": "2023-03-01T07:41:48.640" + }, + { + "source_act_uid": 263748599, + "public_health_case_uid": 234567890, + "source_class_cd": "OBS", + "target_class_cd": "CASE", + "act_type_cd": "MorbReport", + "status_cd": "A", + "act_add_time": "2023-03-01T07:41:48.640", + "act_add_user_id": null, + "add_user_name": null, + "act_last_chg_user_id": null, + "last_chg_user_name": null, + "act_last_chg_time": "2023-03-01T07:41:48.640" } ] \ No newline at end of file diff --git a/liquibase-service/src/main/resources/db/changelog/db.rdb_modern.changelog-16.1.yaml b/liquibase-service/src/main/resources/db/changelog/db.rdb_modern.changelog-16.1.yaml index a6619733..56590ae6 100644 --- a/liquibase-service/src/main/resources/db/changelog/db.rdb_modern.changelog-16.1.yaml +++ b/liquibase-service/src/main/resources/db/changelog/db.rdb_modern.changelog-16.1.yaml @@ -95,7 +95,7 @@ databaseChangeLog: author: liquibase changes: - sqlFile: - path: 012-create-nrt-investigation-notification-002.sql + path: 012-create_nrt_investigation_notification-002.sql splitStatements: false - changeSet: id: 14 @@ -355,4 +355,11 @@ databaseChangeLog: changes: - sqlFile: path: 017-sp_d_labtest_result_postprocessing-001.sql - splitStatements: false \ No newline at end of file + splitStatements: false + - changeSet: + id: 51 + author: liquibase + changes: + - sqlFile: + path: 017-create_nrt_observation-004.sql + splitStatements: false diff --git a/liquibase-service/src/main/resources/db/rdb_modern/tables/012-create-nrt-investigation-notification-002.sql b/liquibase-service/src/main/resources/db/rdb_modern/tables/012-create_nrt_investigation_notification-002.sql similarity index 100% rename from liquibase-service/src/main/resources/db/rdb_modern/tables/012-create-nrt-investigation-notification-002.sql rename to liquibase-service/src/main/resources/db/rdb_modern/tables/012-create_nrt_investigation_notification-002.sql diff --git a/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-002.sql b/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-002.sql index 363c237e..f7cae53f 100644 --- a/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-002.sql +++ b/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-002.sql @@ -7,29 +7,4 @@ IF EXISTS (SELECT 1 FROM sysobjects WHERE name = 'nrt_observation' and xtype = ' ADD ctrl_cd_display_form varchar(20); END; - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'obs_domain_cd_st_1' AND Object_ID = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD obs_domain_cd_st_1 varchar(20); - - END; - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'processing_decision_cd' AND Object_ID = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD processing_decision_cd varchar(20); - - END; - - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'cd' AND Object_ID = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD cd varchar(50); - END; - - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'shared_ind' AND Object_ID = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD shared_ind char(1); - END; - END; \ No newline at end of file diff --git a/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-003.sql b/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-003.sql index ef906e6b..9ed72898 100644 --- a/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-003.sql +++ b/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-003.sql @@ -1,34 +1,5 @@ IF EXISTS (SELECT 1 FROM sysobjects WHERE name = 'nrt_observation' and xtype = 'U') BEGIN - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'ctrl_cd_display_form' AND object_id = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD ctrl_cd_display_form varchar(20); - END; - - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'obs_domain_cd_st_1' AND object_id = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD obs_domain_cd_st_1 varchar(20); - END; - - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'processing_decision_cd' AND object_id = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD processing_decision_cd varchar(20); - END; - - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'cd' AND object_id = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD cd varchar(50); - END; - - IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'shared_ind' AND object_id = Object_ID(N'nrt_observation')) - BEGIN - ALTER TABLE nrt_observation - ADD shared_ind char(1); - END; IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'status_cd' AND object_id = Object_ID(N'nrt_observation')) BEGIN diff --git a/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-004.sql b/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-004.sql new file mode 100644 index 00000000..5138e8d8 --- /dev/null +++ b/liquibase-service/src/main/resources/db/rdb_modern/tables/017-create_nrt_observation-004.sql @@ -0,0 +1,9 @@ +IF EXISTS (SELECT 1 FROM sysobjects WHERE name = 'nrt_observation' and xtype = 'U') + BEGIN + + IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'cd_desc_txt' AND Object_ID = Object_ID(N'nrt_observation')) + BEGIN + EXEC sys.sp_rename N'nrt_observation.cd_desc_text', N'cd_desc_txt', 'COLUMN'; + END; + + END; \ No newline at end of file diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/dto/Observation.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/dto/Observation.java index 6cd6b4b6..6a8e5b48 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/dto/Observation.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/dto/Observation.java @@ -34,7 +34,7 @@ public class Observation { private String obsDomainCdSt1; @Column(name = "cd_desc_txt") - private String cdDescText; + private String cdDescTxt; @Column(name = "record_status_cd") private String recordStatusCd; @@ -120,6 +120,9 @@ public class Observation { @Column(name = "txt") private String txt; + @Column(name = "priority_cd") + private String priorityCd; + @Column(name = "add_user_id") private Long addUserId; diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/dto/ObservationTransformed.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/dto/ObservationTransformed.java index 630deccf..9b988d62 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/dto/ObservationTransformed.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/dto/ObservationTransformed.java @@ -2,18 +2,45 @@ import lombok.*; -@Setter -@Getter +@Data public class ObservationTransformed { - private Long orderingPersonId; + private Long observationUid; + private Long reportObservationUid; + private Long reportSprtUid; + private Long reportRefrUid; + private String resultObservationUid; + private String followUpObservationUid; + private Long patientId; - private Long performingOrganizationId; + private Long orderingPersonId; + private Long morbPhysicianId; + private Long morbReporterId; + private Long morbHospReporterId; + private Long morbHospId; + + private Long transcriptionistId; + private String transcriptionistVal; + private String transcriptionistFirstNm; + private String transcriptionistLastNm; + private String transcriptionistIdAssignAuth; + private String transcriptionistAuthType; + + private Long assistantInterpreterId; + private String assistantInterpreterVal; + private String assistantInterpreterFirstNm; + private String assistantInterpreterLastNm; + private String assistantInterpreterIdAssignAuth; + private String assistantInterpreterAuthType; + + private Long resultInterpreterId; + private Long specimenCollectorId; + private Long copyToProviderId; + private Long labTestTechnicianId; private Long authorOrganizationId; private Long orderingOrganizationId; + private Long performingOrganizationId; + private Long healthCareId; + + private String accessionNumber; private Long materialId; - private String resultObservationUid; - private String followUpObservationUid; - private Long reportObservationUid; - private Long reportSprtUid; - private Long reportRefrUid; } diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/reporting/ObservationReporting.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/reporting/ObservationReporting.java index c97a4e8a..441854f4 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/reporting/ObservationReporting.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/repository/model/reporting/ObservationReporting.java @@ -16,7 +16,6 @@ public class ObservationReporting { private String classCd; private String moodCd; private Long actUid; - private String cdDescText; private String recordStatusCd; private String jurisdictionCd; private Long programJurisdictionOid; @@ -31,6 +30,7 @@ public class ObservationReporting { private String ctrlCdDisplayForm; private String processingDecisionCd; private String cd; + private String cdDescTxt; private String sharedInd; private String statusCd; @@ -59,25 +59,34 @@ public class ObservationReporting { private Long orderingPersonId; private Long morbPhysicianId; private Long morbReporterId; + private Long morbHospReporterId; + private Long morbHospId; + private Long transcriptionistId; private String transcriptionistVal; private String transcriptionistFirstNm; private String transcriptionistLastNm; + private String transcriptionistIdAssignAuth; + private String transcriptionistAuthType; + private Long assistantInterpreterId; private String assistantInterpreterVal; private String assistantInterpreterFirstNm; private String assistantInterpreterLastNm; + private String assistantInterpreterIdAssignAuth; + private String assistantInterpreterAuthType; + private Long resultInterpreterId; private Long specimenCollectorId; private Long copyToProviderId; private Long labTestTechnicianId; - private Long authorOrganizationId; private Long orderingOrganizationId; private Long performingOrganizationId; private Long healthCareId; - private Long morbHospReporterId; + private String accessionNumber; + private String priorityCd; private Long materialId; private Long addUserId; private String addUserName; diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/service/ObservationService.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/service/ObservationService.java index 891b4982..99d676f6 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/service/ObservationService.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/service/ObservationService.java @@ -92,7 +92,7 @@ private void processObservation(String value) { if(observationData.isPresent()) { ObservationReporting reportingModel = modelMapper.map(observationData.get(), ObservationReporting.class); ObservationTransformed observationTransformed = processObservationDataUtil.transformObservationData(observationData.get()); - buildReportingModelForTransformedData(reportingModel, observationTransformed); + modelMapper.map(observationTransformed, reportingModel); pushKeyValuePairToKafka(observationKey, reportingModel, observationTopicOutputReporting); logger.info("Observation data (uid={}) sent to {}", observationUid, observationTopicOutputReporting); } @@ -114,19 +114,4 @@ private void pushKeyValuePairToKafka(ObservationKey observationKey, Object model String jsonValue = jsonGenerator.generateStringJson(model); kafkaTemplate.send(topicName, jsonKey, jsonValue); } - - protected void buildReportingModelForTransformedData(ObservationReporting reportingModel, ObservationTransformed observationTransformed) { - reportingModel.setOrderingPersonId(observationTransformed.getOrderingPersonId()); - reportingModel.setPatientId(observationTransformed.getPatientId()); - reportingModel.setPerformingOrganizationId(observationTransformed.getPerformingOrganizationId()); - reportingModel.setAuthorOrganizationId(observationTransformed.getAuthorOrganizationId()); - reportingModel.setOrderingOrganizationId(observationTransformed.getOrderingOrganizationId()); - reportingModel.setMaterialId(observationTransformed.getMaterialId()); - reportingModel.setResultObservationUid(observationTransformed.getResultObservationUid()); - reportingModel.setFollowupObservationUid(observationTransformed.getFollowUpObservationUid()); - reportingModel.setReportObservationUid(Optional.ofNullable(observationTransformed.getReportObservationUid()) - .orElse(reportingModel.getObservationUid())); - reportingModel.setReportRefrUid(observationTransformed.getReportRefrUid()); - reportingModel.setReportSprtUid(observationTransformed.getReportSprtUid()); - } } diff --git a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java index 02672839..9398b7a7 100644 --- a/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java +++ b/observation-service/src/main/java/gov/cdc/etldatapipeline/observation/util/ProcessObservationDataUtil.java @@ -16,8 +16,11 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -import java.util.*; -import java.util.function.BiPredicate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; @Component @RequiredArgsConstructor @Setter @@ -54,11 +57,13 @@ public class ProcessObservationDataUtil { private static final String SUBJECT_CLASS_CD = "subject_class_cd"; public static final String TYPE_CD = "type_cd"; public static final String ENTITY_ID = "entity_id"; - public static final String DOM_ORDER = "Order"; - public static final String DOM_RESULT = "Result"; + public static final String ORDER = "Order"; + public static final String RESULT = "Result"; public ObservationTransformed transformObservationData(Observation observation) { ObservationTransformed observationTransformed = new ObservationTransformed(); + observationTransformed.setObservationUid(observation.getObservationUid()); + observationTransformed.setReportObservationUid(observation.getObservationUid()); String obsDomainCdSt1 = observation.getObsDomainCdSt1(); @@ -67,6 +72,7 @@ public ObservationTransformed transformObservationData(Observation observation) transformMaterialParticipations(observation.getMaterialParticipations(), obsDomainCdSt1, observationTransformed); transformFollowupObservations(observation.getFollowupObservations(), obsDomainCdSt1, observationTransformed); transformParentObservations(observation.getParentObservations(), obsDomainCdSt1, observationTransformed); + transformActIds(observation.getActIds(), observationTransformed); transformObservationCoded(observation.getObsCode()); transformObservationDate(observation.getObsDate()); transformObservationEdx(observation.getEdxIds()); @@ -82,60 +88,131 @@ private void transformPersonParticipations(String personParticipations, String o JsonNode personParticipationsJsonArray = parseJsonArray(personParticipations); for (JsonNode jsonNode : personParticipationsJsonArray) { - String typeCd = getNodeValue(jsonNode.get(TYPE_CD)); - String subjectClassCd = getNodeValue(jsonNode.get(SUBJECT_CLASS_CD)); + assertDomainCdMatches(obsDomainCdSt1, ORDER, RESULT); - if(obsDomainCdSt1.equals(DOM_ORDER)) { - if(typeAndClassNull.test(typeCd, subjectClassCd)) { - if("ORD".equals(typeCd) && "PSN".equals(subjectClassCd)) { - observationTransformed.setOrderingPersonId(jsonNode.get(ENTITY_ID).asLong()); - } - if ("PATSBJ".equals(typeCd) && "PSN".equals(subjectClassCd)) { - observationTransformed.setPatientId(jsonNode.get(ENTITY_ID).asLong()); + String typeCd = getNodeValue(jsonNode, TYPE_CD, JsonNode::asText); + Long entityId = getNodeValue(jsonNode, ENTITY_ID, JsonNode::asLong); + + if (typeCd.equals("PATSBJ")) { + transformPersonParticipationRoles(jsonNode, observationTransformed, entityId); + } + + if (ORDER.equals(obsDomainCdSt1)) { + String subjectClassCd = getNodeValue(jsonNode, SUBJECT_CLASS_CD, JsonNode::asText); + if ("PSN".equals(subjectClassCd)) { + switch (typeCd) { + case "ORD": + observationTransformed.setOrderingPersonId(entityId); + break; + case "PATSBJ", "SubjOfMorbReport": + observationTransformed.setPatientId(entityId); + break; + case "PhysicianOfMorb": + observationTransformed.setMorbPhysicianId(entityId); + break; + case "ReporterOfMorbReport": + observationTransformed.setMorbReporterId(entityId); + break; + case "ENT": + observationTransformed.setTranscriptionistId(entityId); + Optional.ofNullable(jsonNode.get("first_nm")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setTranscriptionistFirstNm(n.asText())); + Optional.ofNullable(jsonNode.get("last_nm")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setTranscriptionistLastNm(n.asText())); + Optional.ofNullable(jsonNode.get("person_id_val")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setTranscriptionistVal(n.asText())); + Optional.ofNullable(jsonNode.get("person_id_assign_auth_cd")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setTranscriptionistIdAssignAuth(n.asText())); + Optional.ofNullable(jsonNode.get("person_id_type_desc")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setTranscriptionistAuthType(n.asText())); + break; + case "ASS": + observationTransformed.setAssistantInterpreterId(entityId); + Optional.ofNullable(jsonNode.get("first_nm")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setAssistantInterpreterFirstNm(n.asText())); + Optional.ofNullable(jsonNode.get("last_nm")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setAssistantInterpreterLastNm(n.asText())); + Optional.ofNullable(jsonNode.get("person_id_val")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setAssistantInterpreterVal(n.asText())); + Optional.ofNullable(jsonNode.get("person_id_assign_auth_cd")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setAssistantInterpreterIdAssignAuth(n.asText())); + Optional.ofNullable(jsonNode.get("person_id_type_desc")).filter(n -> !n.isNull()) + .ifPresent(n -> observationTransformed.setAssistantInterpreterAuthType(n.asText())); + break; + case "VRF": + observationTransformed.setResultInterpreterId(entityId); + break; + case "PRF": + observationTransformed.setLabTestTechnicianId(entityId); + break; + default: } - } else { - logger.error("Type_cd or subject_class_cd is null for the personParticipations: {}", personParticipations); } - } else { - logger.error("obsDomainCdSt1: {} is not valid for the personParticipations.", obsDomainCdSt1); } } } catch (IllegalArgumentException ex) { - logger.info("PersonParticipations array is null."); + logger.info(ex.getMessage(), "PersonParticipations", personParticipations); } catch (Exception e) { logger.error("Error processing Person Participation JSON array from observation data: {}", e.getMessage()); } } + private void transformPersonParticipationRoles(JsonNode node, ObservationTransformed observationTransformed, Long entityId) { + String roleSubject = node.path("role_subject_class_cd").asText(); + if ("PROV".equals(roleSubject)) { + String roleCd = node.path("role_cd").asText(); + if ("SPP".equals(roleCd)) { + String roleScoping = node.path("role_scoping_class_cd").asText(); + if ("PSN".equals(roleScoping)) { + observationTransformed.setSpecimenCollectorId(entityId); + } + } else if ("CT".equals(roleCd)) { + observationTransformed.setCopyToProviderId(entityId); + } + } + } + private void transformOrganizationParticipations(String organizationParticipations, String obsDomainCdSt1, ObservationTransformed observationTransformed) { try { JsonNode organizationParticipationsJsonArray = parseJsonArray(organizationParticipations); - for(JsonNode jsonNode : organizationParticipationsJsonArray) { - String typeCd = getNodeValue(jsonNode.get(TYPE_CD)); - String subjectClassCd = getNodeValue(jsonNode.get(SUBJECT_CLASS_CD)); + for (JsonNode jsonNode : organizationParticipationsJsonArray) { + assertDomainCdMatches(obsDomainCdSt1, RESULT, ORDER); + + String typeCd = getNodeValue(jsonNode, TYPE_CD, JsonNode::asText); + String subjectClassCd = getNodeValue(jsonNode, SUBJECT_CLASS_CD, JsonNode::asText); + Long entityId = getNodeValue(jsonNode, ENTITY_ID, JsonNode::asLong); - if (typeAndClassNull.test(typeCd, subjectClassCd)) { - if (obsDomainCdSt1.equals(DOM_RESULT)) { - if("PRF".equals(typeCd) && "ORG".equals(subjectClassCd)) { - observationTransformed.setPerformingOrganizationId(jsonNode.get(ENTITY_ID).asLong()); + if (subjectClassCd.equals("ORG")) { + if (RESULT.equals(obsDomainCdSt1)) { + if ("PRF".equals(typeCd)) { + observationTransformed.setPerformingOrganizationId(entityId); + } + } else if (ORDER.equals(obsDomainCdSt1)) { + switch (typeCd) { + case "AUT": + observationTransformed.setAuthorOrganizationId(entityId); + break; + case "ORD": + observationTransformed.setOrderingOrganizationId(entityId); + break; + case "HCFAC": + observationTransformed.setHealthCareId(entityId); + break; + case "ReporterOfMorbReport": + observationTransformed.setMorbHospReporterId(entityId); + break; + case "HospOfMorbObs": + observationTransformed.setMorbHospId(entityId); + break; + default: + break; } - } else if(obsDomainCdSt1.equals(DOM_ORDER)) { - if("AUT".equals(typeCd) && "ORG".equals(subjectClassCd)) { - observationTransformed.setAuthorOrganizationId(jsonNode.get(ENTITY_ID).asLong()); - } - if("ORD".equals(typeCd) && "ORG".equals(subjectClassCd)) { - observationTransformed.setOrderingOrganizationId(jsonNode.get(ENTITY_ID).asLong()); - } - } else { - logger.error("obsDomainCdSt1: {} is not valid for the organizationParticipations", obsDomainCdSt1); } - } else { - logger.error("Type_cd or subject_class_cd is null for the organizationParticipations: {}", organizationParticipations); } } } catch (IllegalArgumentException ex) { - logger.info("OrganizationParticipations array is null."); + logger.info(ex.getMessage(), "OrganizationParticipations", organizationParticipations); } catch (Exception e) { logger.error("Error processing Organization Participation JSON array from observation data: {}", e.getMessage()); } @@ -146,31 +223,23 @@ private void transformMaterialParticipations(String materialParticipations, Stri JsonNode materialParticipationsJsonArray = parseJsonArray(materialParticipations); for (JsonNode jsonNode : materialParticipationsJsonArray) { - String typeCd = getNodeValue(jsonNode.get(TYPE_CD)); - String subjectClassCd = getNodeValue(jsonNode.get(SUBJECT_CLASS_CD)); - - if (obsDomainCdSt1.equals(DOM_ORDER)) { - if (typeAndClassNull.test(typeCd, subjectClassCd)) { - if ("SPC".equals(typeCd) && "MAT".equals(subjectClassCd)) { - Long materialId = jsonNode.get(ENTITY_ID).asLong(); - observationTransformed.setMaterialId(materialId); - - ObservationMaterial material = objectMapper.treeToValue(jsonNode, ObservationMaterial.class); - material.setMaterialId(materialId); - ObservationMaterialKey key = new ObservationMaterialKey(); - key.setMaterialId(observationTransformed.getMaterialId()); - sendToKafka(key, material, materialTopicName, materialId, "Observation Material data (uid={}) sent to {}"); - } - } else { - logger.error("Type_cd or subject_class_cd is null for the materialParticipations: {}", materialParticipations); - } - } - else { - logger.error("obsDomainCdSt1: {} is not valid for the materialParticipations", obsDomainCdSt1); + String typeCd = getNodeValue(jsonNode, TYPE_CD, JsonNode::asText); + String subjectClassCd = getNodeValue(jsonNode, SUBJECT_CLASS_CD, JsonNode::asText); + + assertDomainCdMatches(obsDomainCdSt1, ORDER); + if ("SPC".equals(typeCd) && "MAT".equals(subjectClassCd)) { + Long materialId = jsonNode.get(ENTITY_ID).asLong(); + observationTransformed.setMaterialId(materialId); + + ObservationMaterial material = objectMapper.treeToValue(jsonNode, ObservationMaterial.class); + material.setMaterialId(materialId); + ObservationMaterialKey key = new ObservationMaterialKey(); + key.setMaterialId(observationTransformed.getMaterialId()); + sendToKafka(key, material, materialTopicName, materialId, "Observation Material data (uid={}) sent to {}"); } } } catch (IllegalArgumentException ex) { - logger.info("MaterialParticipations array is null."); + logger.info(ex.getMessage(), "MaterialParticipations", materialParticipations); } catch (Exception e) { logger.error("Error processing Material Participation JSON array from observation data: {}", e.getMessage()); } @@ -183,17 +252,13 @@ private void transformFollowupObservations(String followupObservations, String o List results = new ArrayList<>(); List followUps = new ArrayList<>(); for (JsonNode jsonNode : followupObservationsJsonArray) { - String domainCdSt1 = getNodeValue(jsonNode.get("domain_cd_st_1")); + Optional domainCd = Optional.ofNullable(jsonNode.get("domain_cd_st_1")); + assertDomainCdMatches(obsDomainCdSt1, ORDER); - if (obsDomainCdSt1.equals(DOM_ORDER)) { - if (DOM_RESULT.equals(domainCdSt1)) { - Optional.ofNullable(jsonNode.get("result_observation_uid")).ifPresent(r -> results.add(r.asText())); - } - else { - Optional.ofNullable(jsonNode.get("result_observation_uid")).ifPresent(r -> followUps.add(r.asText())); - } + if (domainCd.isPresent() && RESULT.equals(domainCd.get().asText())) { + Optional.ofNullable(jsonNode.get("result_observation_uid")).ifPresent(r -> results.add(r.asText())); } else { - logger.error("obsDomainCdSt1: {} is not valid for the followupObservations", obsDomainCdSt1); + Optional.ofNullable(jsonNode.get("result_observation_uid")).ifPresent(r -> followUps.add(r.asText())); } } @@ -204,7 +269,7 @@ private void transformFollowupObservations(String followupObservations, String o observationTransformed.setFollowUpObservationUid(String.join(",", followUps)); } } catch (IllegalArgumentException ex) { - logger.info("FollowupObservations array is null."); + logger.info(ex.getMessage(), "FollowupObservations", followupObservations); } catch (Exception e) { logger.error("Error processing Followup Observations JSON array from observation data: {}", e.getMessage()); } @@ -215,38 +280,58 @@ private void transformParentObservations(String parentObservations, String obsDo JsonNode parentObservationsJsonArray = parseJsonArray(parentObservations); for (JsonNode jsonNode : parentObservationsJsonArray) { - Optional parentTypeCd = Optional.ofNullable(getNodeValue(jsonNode.get("parent_type_cd"))); - if (obsDomainCdSt1.equals(DOM_ORDER)) { - parentTypeCd.ifPresentOrElse(typeCd -> { - Optional parentUid = Optional.ofNullable(jsonNode.get("parent_uid")); - Optional observationUid = Optional.ofNullable(jsonNode.get("report_observation_uid")); - - switch (typeCd) { - case "SPRT": - parentUid.ifPresent(id -> observationTransformed.setReportSprtUid(id.asLong())); - observationUid.ifPresent(id -> observationTransformed.setReportObservationUid(id.asLong())); - break; - case "REFR": - parentUid.ifPresent(id -> observationTransformed.setReportRefrUid(id.asLong())); - observationUid.ifPresent(id -> observationTransformed.setReportObservationUid(id.asLong())); - break; - default: - parentUid.ifPresent(id -> observationTransformed.setReportObservationUid(id.asLong())); - break; - } - }, - () -> logger.error("Parent_type_cd is null for the parentObservations: {}", parentObservations)); + String parentTypeCd = getNodeValue(jsonNode, "parent_type_cd", JsonNode::asText); + Optional parentUid = Optional.ofNullable(jsonNode.get("parent_uid")); + + if (ORDER.equals(obsDomainCdSt1)) { + Optional observationUid = Optional.ofNullable(jsonNode.get("observation_uid")); + + switch (parentTypeCd) { + case "SPRT": + parentUid.ifPresent(id -> observationTransformed.setReportSprtUid(id.asLong())); + observationUid.ifPresent(id -> observationTransformed.setReportObservationUid(id.asLong())); + break; + case "REFR": + parentUid.ifPresent(id -> observationTransformed.setReportRefrUid(id.asLong())); + observationUid.ifPresent(id -> observationTransformed.setReportObservationUid(id.asLong())); + break; + default: + parentUid.ifPresent(id -> observationTransformed.setReportObservationUid(id.asLong())); + break; + } } else { - logger.error("obsDomainCdSt1: {} is not valid for the parentObservations", obsDomainCdSt1); + Optional.ofNullable(jsonNode.get("parent_domain_cd_st_1")).ifPresent( node -> { + if (node.asText().contains(ORDER)) { + parentUid.ifPresent(id -> observationTransformed.setReportObservationUid(id.asLong())); + } + }); } } } catch (IllegalArgumentException ex) { - logger.info("ParentObservations array is null."); + logger.info(ex.getMessage(), "ParentObservations", parentObservations); } catch (Exception e) { logger.error("Error processing Parent Observations JSON array from observation data: {}", e.getMessage()); } } + private void transformActIds(String actIds, ObservationTransformed observationTransformed) { + try { + JsonNode actIdsJsonArray = parseJsonArray(actIds); + + for (JsonNode jsonNode : actIdsJsonArray) { + String typeCd = getNodeValue(jsonNode, TYPE_CD, JsonNode::asText); + if (typeCd.equals("FN")) { + String rootExtTxt = getNodeValue(jsonNode, "root_extension_txt", JsonNode::asText); + observationTransformed.setAccessionNumber(rootExtTxt); + } + } + } catch (IllegalArgumentException ex) { + logger.info(ex.getMessage(), "ActIds", actIds); + } catch (Exception e) { + logger.error("Error processing Act Ids JSON array from observation data: {}", e.getMessage()); + } + } + private void transformObservationCoded(String observationCoded) { try { JsonNode observationCodedJsonArray = parseJsonArray(observationCoded); @@ -257,7 +342,7 @@ private void transformObservationCoded(String observationCoded) { sendToKafka(observationKey, coded, codedTopicName, coded.getObservationUid(), "Observation Coded data (uid={}) sent to {}"); } } catch (IllegalArgumentException ex) { - logger.info("ObservationCoded array is null."); + logger.info(ex.getMessage(), "ObservationCoded"); } catch (Exception e) { logger.error("Error processing Observation Coded JSON array from observation data: {}", e.getMessage()); } @@ -273,7 +358,7 @@ private void transformObservationDate(String observationDate) { sendToKafka(observationKey, coded, dateTopicName, coded.getObservationUid(), "Observation Date data (uid={}) sent to {}"); } } catch (IllegalArgumentException ex) { - logger.info("ObservationDate array is null."); + logger.info(ex.getMessage(), "ObservationDate"); } catch (Exception e) { logger.error("Error processing Observation Date JSON array from observation data: {}", e.getMessage()); } @@ -290,7 +375,7 @@ private void transformObservationEdx(String observationEdx) { sendToKafka(edxKey, edx, edxTopicName, edx.getEdxDocumentUid(), "Observation Edx data (edx doc uid={}) sent to {}"); } } catch (IllegalArgumentException ex) { - logger.info("ObservationEdx array is null."); + logger.info(ex.getMessage(), "ObservationEdx"); } catch (Exception e) { logger.error("Error processing Observation Edx JSON array from observation data: {}", e.getMessage()); } @@ -306,7 +391,7 @@ private void transformObservationNumeric(String observationNumeric) { sendToKafka(observationKey, numeric, numericTopicName, numeric.getObservationUid(), "Observation Numeric data (uid={}) sent to {}"); } } catch (IllegalArgumentException ex) { - logger.info("ObservationNumeric array is null."); + logger.info(ex.getMessage(), "ObservationNumeric"); } catch (Exception e) { logger.error("Error processing Observation Numeric JSON array from observation data: {}", e.getMessage()); } @@ -322,7 +407,7 @@ private void transformObservationReasons(String observationReasons) { sendToKafka(observationKey, reason, reasonTopicName, reason.getObservationUid(), "Observation Reason data (uid={}) sent to {}"); } } catch (IllegalArgumentException ex) { - logger.info("ObservationReasons array is null."); + logger.info(ex.getMessage(), "ObservationReasons"); } catch (Exception e) { logger.error("Error processing Observation Reasons JSON array from observation data: {}", e.getMessage()); } @@ -338,7 +423,7 @@ private void transformObservationTxt(String observationTxt) { sendToKafka(observationKey, txt, txtTopicName, txt.getObservationUid(), "Observation Txt data (uid={}) sent to {}"); } } catch (IllegalArgumentException ex) { - logger.info("ObservationTxt array is null."); + logger.info(ex.getMessage(), "ObservationTxt"); } catch (Exception e) { logger.error("Error processing Observation Txt JSON array from observation data: {}", e.getMessage()); } @@ -356,13 +441,21 @@ private JsonNode parseJsonArray(String jsonString) throws JsonProcessingExceptio if (jsonArray != null && jsonArray.isArray()) { return jsonArray; } else { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("{} array is null."); } } - private String getNodeValue(JsonNode jsonNode) { - return jsonNode == null || jsonNode.isNull() ? null : jsonNode.asText(); + private T getNodeValue(JsonNode jsonNode, String fieldName, Function mapper) { + JsonNode node = jsonNode.get(fieldName); + if (node == null || node.isNull()) { + throw new IllegalArgumentException("Field " + fieldName + " is null or not found in {}: {}"); + } + return mapper.apply(node); } - private final BiPredicate typeAndClassNull = (t, c) -> (t != null) && (c != null); + private void assertDomainCdMatches(String value, String... vals ) { + if (Arrays.stream(vals).noneMatch(value::equals)) { + throw new IllegalArgumentException("obsDomainCdSt1: " + value + " is not valid for the {}"); + } + } } diff --git a/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/ObservationDataProcessTests.java b/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/ObservationDataProcessTests.java index 1a56b247..6d54c3fc 100644 --- a/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/ObservationDataProcessTests.java +++ b/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/ObservationDataProcessTests.java @@ -9,6 +9,7 @@ import gov.cdc.etldatapipeline.observation.repository.model.dto.ObservationTransformed; import gov.cdc.etldatapipeline.observation.repository.model.reporting.*; import gov.cdc.etldatapipeline.observation.util.ProcessObservationDataUtil; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -105,6 +106,36 @@ void consolidatedDataTransformationTest() { Assertions.assertEquals("56789012,56789013", resultObsUid); } + @Test + void testPersonParticipationTransformation() { + Observation observation = new Observation(); + observation.setActUid(100000001L); + observation.setObsDomainCdSt1("Order"); + + final var expected = getObservationTransformed(); + + observation.setPersonParticipations(readFileData(FILE_PREFIX + "PersonParticipations.json")); + ObservationTransformed observationTransformed = transformer.transformObservationData(observation); + Assertions.assertEquals(expected, observationTransformed); + } + + @Test + void testMorbReportTransformation() { + Observation observation = new Observation(); + observation.setActUid(100000001L); + observation.setObsDomainCdSt1("Order"); + + final var expected = new ObservationTransformed(); + + expected.setPatientId(10000055L); + expected.setMorbPhysicianId(10000033L); + expected.setMorbReporterId(10000044L); + + observation.setPersonParticipations(readFileData(FILE_PREFIX + "PersonParticipationsMorb.json")); + ObservationTransformed observationTransformed = transformer.transformObservationData(observation); + Assertions.assertEquals(expected, observationTransformed); + } + @Test void testOrganizationParticipationTransformation() { Observation observation = new Observation(); @@ -124,7 +155,7 @@ void testOrganizationParticipationTransformation() { } @Test - void testOrganizationMaterialTransformation() throws JsonProcessingException { + void testObservationMaterialTransformation() throws JsonProcessingException { Observation observation = new Observation(); observation.setActUid(100000003L); observation.setObservationUid(100000003L); @@ -148,20 +179,21 @@ void testOrganizationMaterialTransformation() throws JsonProcessingException { assertEquals(material, actualMaterial); } - @Test - void testOrganizationParentTransformation() { + @ParameterizedTest + @CsvSource({"'Order'", "'Result'"}) + void testParentObservationsTransformation(String domainCd) { Observation observation = new Observation(); observation.setActUid(100000003L); observation.setObservationUid(100000003L); - observation.setObsDomainCdSt1("Order"); - observation.setParentObservations("[{\"parent_type_cd\": \"COMP\",\"parent_uid\": 234567888}]"); + observation.setParentObservations("[{\"parent_type_cd\":\"MorbFrmQ\",\"parent_uid\":234567888,\"parent_domain_cd_st_1\":\"R_Order\"}]"); + observation.setObsDomainCdSt1(domainCd); ObservationTransformed observationTransformed = transformer.transformObservationData(observation); assertEquals(234567888L, observationTransformed.getReportObservationUid()); } - @Test - void testOrganizationCodedTransformation() throws JsonProcessingException { + @Test + void testObservationCodedTransformation() throws JsonProcessingException { Observation observation = new Observation(); observation.setActUid(10001234L); observation.setObservationUid(10001234L); @@ -183,7 +215,7 @@ void testOrganizationCodedTransformation() throws JsonProcessingException { verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); assertEquals(CODED_TOPIC, topicCaptor.getValue()); List logs = listAppender.list; - assertTrue(logs.get(5).getFormattedMessage().contains("Observation Coded data (uid=10001234) sent to "+CODED_TOPIC)); + assertTrue(logs.get(6).getFormattedMessage().contains("Observation Coded data (uid=10001234) sent to "+CODED_TOPIC)); var actualCoded = objectMapper.readValue( objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationCoded.class); @@ -192,7 +224,7 @@ void testOrganizationCodedTransformation() throws JsonProcessingException { } @Test - void testOrganizationDateTransformation() throws JsonProcessingException { + void testObservationDateTransformation() throws JsonProcessingException { Observation observation = new Observation(); observation.setActUid(10001234L); observation.setObservationUid(10001234L); @@ -209,7 +241,7 @@ void testOrganizationDateTransformation() throws JsonProcessingException { verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); assertEquals(DATE_TOPIC, topicCaptor.getValue()); List logs = listAppender.list; - assertTrue(logs.get(6).getFormattedMessage().contains("Observation Date data (uid=10001234) sent to "+DATE_TOPIC)); + assertTrue(logs.get(7).getFormattedMessage().contains("Observation Date data (uid=10001234) sent to "+DATE_TOPIC)); var actualObd = objectMapper.readValue( objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationDate.class); @@ -218,7 +250,7 @@ void testOrganizationDateTransformation() throws JsonProcessingException { } @Test - void testOrganizationEdxTransformation() throws JsonProcessingException { + void testObservationEdxTransformation() throws JsonProcessingException { Observation observation = new Observation(); observation.setActUid(10001234L); observation.setObservationUid(10001234L); @@ -236,7 +268,7 @@ void testOrganizationEdxTransformation() throws JsonProcessingException { verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); assertEquals(EDX_TOPIC, topicCaptor.getValue()); List logs = listAppender.list; - assertTrue(logs.get(7).getFormattedMessage().contains("Observation Edx data (edx doc uid=10101) sent to "+EDX_TOPIC)); + assertTrue(logs.get(8).getFormattedMessage().contains("Observation Edx data (edx doc uid=10101) sent to "+EDX_TOPIC)); var actualEdx = objectMapper.readValue( objectMapper.readTree(messageCaptor.getAllValues().getFirst()).path("payload").toString(), ObservationEdx.class); @@ -245,7 +277,7 @@ void testOrganizationEdxTransformation() throws JsonProcessingException { } @Test - void testOrganizationNumericTransformation() throws JsonProcessingException { + void testObservationNumericTransformation() throws JsonProcessingException { Observation observation = new Observation(); observation.setActUid(10001234L); observation.setObservationUid(10001234L); @@ -268,7 +300,7 @@ void testOrganizationNumericTransformation() throws JsonProcessingException { verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); assertEquals(NUMERIC_TOPIC, topicCaptor.getValue()); List logs = listAppender.list; - assertTrue(logs.get(8).getFormattedMessage().contains("Observation Numeric data (uid=10001234) sent to "+NUMERIC_TOPIC)); + assertTrue(logs.get(9).getFormattedMessage().contains("Observation Numeric data (uid=10001234) sent to "+NUMERIC_TOPIC)); var actualNumeric = objectMapper.readValue( objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationNumeric.class); @@ -277,7 +309,7 @@ void testOrganizationNumericTransformation() throws JsonProcessingException { } @Test - void testOrganizationReasonTransformation() throws JsonProcessingException { + void testObservationReasonTransformation() throws JsonProcessingException { Observation observation = new Observation(); observation.setActUid(10001234L); observation.setObservationUid(10001234L); @@ -295,7 +327,7 @@ void testOrganizationReasonTransformation() throws JsonProcessingException { verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); assertEquals(REASON_TOPIC, topicCaptor.getValue()); List logs = listAppender.list; - assertTrue(logs.get(9).getFormattedMessage().contains("Observation Reason data (uid=10001234) sent to "+REASON_TOPIC)); + assertTrue(logs.get(10).getFormattedMessage().contains("Observation Reason data (uid=10001234) sent to "+REASON_TOPIC)); var actualReason = objectMapper.readValue( objectMapper.readTree(messageCaptor.getValue()).path("payload").toString(), ObservationReason.class); @@ -304,7 +336,7 @@ void testOrganizationReasonTransformation() throws JsonProcessingException { } @Test - void testOrganizationTxtTransformation() throws JsonProcessingException { + void testObservationTxtTransformation() throws JsonProcessingException { Observation observation = new Observation(); observation.setActUid(10001234L); observation.setObservationUid(10001234L); @@ -323,7 +355,7 @@ void testOrganizationTxtTransformation() throws JsonProcessingException { verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); assertEquals(TXT_TOPIC, topicCaptor.getValue()); List logs = listAppender.list; - assertTrue(logs.get(10).getFormattedMessage().contains("Observation Txt data (uid=10001234) sent to "+TXT_TOPIC)); + assertTrue(logs.get(11).getFormattedMessage().contains("Observation Txt data (uid=10001234) sent to "+TXT_TOPIC)); var actualTxt = objectMapper.readValue( objectMapper.readTree(messageCaptor.getAllValues().getFirst()).path("payload").toString(), ObservationTxt.class); @@ -338,7 +370,7 @@ void testTransformNoObservationData() { transformer.transformObservationData(observation); List logs = listAppender.list; - logs.forEach(le -> assertTrue(le.getFormattedMessage().matches("(\\w+) array is null."))); + logs.forEach(le -> assertTrue(le.getFormattedMessage().matches("^\\w+ array is null."))); } @Test @@ -351,6 +383,7 @@ void testTransformObservationDataError(){ observation.setMaterialParticipations(invalidJSON); observation.setFollowupObservations(invalidJSON); observation.setParentObservations(invalidJSON); + observation.setActIds(invalidJSON); observation.setObsCode(invalidJSON); observation.setObsDate(invalidJSON); observation.setEdxIds(invalidJSON); @@ -367,7 +400,7 @@ void testTransformObservationDataError(){ @Test void testTransformObservationInvalidDomainError(){ Observation observation = new Observation(); - String dummyJSON = "[{\"type_cd\":\"ORD\", \"subject_class_cd\": \"ORD\"}]"; + String dummyJSON = "[{\"type_cd\":\"ORD\",\"subject_class_cd\":\"ORD\",\"domain_cd_st_1\":\"Result\"}]"; String invalidDomain = "invalidDomain"; observation.setObservationUid(10000001L); observation.setObsDomainCdSt1(invalidDomain); @@ -388,19 +421,19 @@ void testTransformObservationInvalidDomainError(){ @Test void testTransformObservationResultDomainError(){ Observation observation = new Observation(); - String dummyJSON = "[{\"type_cd\": \"PRF\",\"subject_class_cd\": \"ORG\",\"entity_id\": 45678901}]"; + String dummyJSON = "[{\"type_cd\":\"PRF\",\"subject_class_cd\":\"ORG\",\"entity_id\":45678901,\"domain_cd_st_1\":\"Result\"}]"; + String invalidDomainCode = "Check"; - observation.setObsDomainCdSt1("Result"); + observation.setObsDomainCdSt1(invalidDomainCode); observation.setPersonParticipations(dummyJSON); observation.setOrganizationParticipations(dummyJSON); observation.setMaterialParticipations(dummyJSON); observation.setFollowupObservations(dummyJSON); - observation.setParentObservations(dummyJSON); transformer.transformObservationData(observation); List logs = listAppender.list.subList(0, 4); - logs.forEach(le -> assertTrue(le.getFormattedMessage().contains("Result is not valid"))); + logs.forEach(le -> assertTrue(le.getFormattedMessage().contains(invalidDomainCode + " is not valid"))); } @ParameterizedTest @@ -422,7 +455,34 @@ void testTransformObservationNullError(String payload){ transformer.transformObservationData(observation); List logs = listAppender.list.subList(0, 4); - logs.forEach(le -> assertTrue(le.getFormattedMessage().contains("_cd is null"))); + logs.forEach(le -> assertTrue(le.getFormattedMessage().matches("^Field \\w+ is null or not found.*"))); + } + + private @NotNull ObservationTransformed getObservationTransformed() { + ObservationTransformed expected = new ObservationTransformed(); + expected.setPatientId(10000066L); + expected.setOrderingPersonId(10000055L); + expected.setAssistantInterpreterId(10000077L); + expected.setAssistantInterpreterVal("22582"); + expected.setAssistantInterpreterFirstNm("Cara"); + expected.setAssistantInterpreterLastNm("Dune"); + expected.setAssistantInterpreterIdAssignAuth("22D7377772"); + expected.setAssistantInterpreterAuthType("Employee number"); + + expected.setTranscriptionistId(10000088L); + expected.setTranscriptionistVal("34344355455144"); + expected.setTranscriptionistFirstNm("Moff"); + expected.setTranscriptionistLastNm("Gideon"); + expected.setTranscriptionistIdAssignAuth("18D8181818"); + expected.setTranscriptionistAuthType("Employee number"); + + expected.setResultInterpreterId(10000022L); + expected.setLabTestTechnicianId(10000011L); + + expected.setSpecimenCollectorId(10000033L); + expected.setCopyToProviderId(10000044L); + + return expected; } private @NonNull ObservationMaterial constructObservationMaterial(Long actUid) { diff --git a/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/service/ObservationServiceTest.java b/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/service/ObservationServiceTest.java index e14df1b0..5283b52f 100644 --- a/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/service/ObservationServiceTest.java +++ b/observation-service/src/test/java/gov/cdc/etldatapipeline/observation/service/ObservationServiceTest.java @@ -11,14 +11,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; +import org.mockito.*; import org.springframework.kafka.core.KafkaTemplate; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import static gov.cdc.etldatapipeline.commonutil.TestUtils.readFileData; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -27,6 +25,9 @@ class ObservationServiceTest { + @InjectMocks + private ObservationService observationService; + @Mock private IObservationRepository observationRepository; @@ -42,11 +43,20 @@ class ObservationServiceTest { @Captor private ArgumentCaptor messageCaptor; + private final ObjectMapper objectMapper = new ObjectMapper(); private AutoCloseable closeable; + private final String inputTopicName = "Observation"; + private final String outputTopicName = "ObservationOutput"; + @BeforeEach void setUp() { closeable = MockitoAnnotations.openMocks(this); + ProcessObservationDataUtil transformer = new ProcessObservationDataUtil(kafkaTemplate); + transformer.setMaterialTopicName("materialTopic"); + observationService = new ObservationService(observationRepository, kafkaTemplate, transformer); + observationService.setObservationTopic(inputTopicName); + observationService.setObservationTopicOutputReporting(outputTopicName); } @AfterEach @@ -54,14 +64,8 @@ void closeService() throws Exception { closeable.close(); } - ProcessObservationDataUtil transformer = new ProcessObservationDataUtil(kafkaTemplate); - private final ObjectMapper objectMapper = new ObjectMapper(); - @Test void testProcessMessage() throws JsonProcessingException { - String observationTopic = "Observation"; - String observationTopicOutput = "ObservationOutput"; - // Mocked input data Long observationUid = 123456789L; String obsDomainCdSt = "Order"; @@ -69,47 +73,39 @@ void testProcessMessage() throws JsonProcessingException { Observation observation = constructObservation(observationUid, obsDomainCdSt); when(observationRepository.computeObservations(String.valueOf(observationUid))).thenReturn(Optional.of(observation)); + when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null)); - validateData(observationTopic, observationTopicOutput, payload, observation); + validateData(payload, observation); verify(observationRepository).computeObservations(String.valueOf(observationUid)); } @Test void testProcessMessageException() { - String observationTopic = "Observation"; - String observationTopicOutput = "ObservationOutput"; String invalidPayload = "{\"payload\": {\"after\": {}}}"; - final var observationService = getObservationService(observationTopic, observationTopicOutput); - RuntimeException ex = assertThrows(RuntimeException.class, () -> observationService.processMessage(invalidPayload, observationTopic)); + RuntimeException ex = assertThrows(RuntimeException.class, () -> observationService.processMessage(invalidPayload, inputTopicName)); assertEquals(ex.getCause().getClass(), NoSuchElementException.class); } @Test void testProcessMessageNoDataException() { - String observationTopic = "Observation"; - String observationTopicOutput = "ObservationOutput"; Long observationUid = 123456789L; String payload = "{\"payload\": {\"after\": {\"observation_uid\": \"" + observationUid + "\"}}}"; when(observationRepository.computeObservations(String.valueOf(observationUid))).thenReturn(Optional.empty()); - - final var observationService = getObservationService(observationTopic, observationTopicOutput); - assertThrows(NoDataException.class, () -> observationService.processMessage(payload, observationTopic)); + assertThrows(NoDataException.class, () -> observationService.processMessage(payload, inputTopicName)); } - private void validateData(String inputTopicName, String outputTopicName, - String payload, Observation observation) throws JsonProcessingException { - final var observationService = getObservationService(inputTopicName, outputTopicName); + private void validateData(String payload, Observation observation) throws JsonProcessingException { observationService.processMessage(payload, inputTopicName); ObservationKey observationKey = new ObservationKey(); observationKey.setObservationUid(observation.getObservationUid()); - ObservationReporting reportingModel = constructObservationReporting(observation.getObservationUid(), observation.getObsDomainCdSt1()); + var reportingModel = constructObservationReporting(observation.getObservationUid(), observation.getObsDomainCdSt1()); - verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); + verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture()); String actualTopic = topicCaptor.getValue(); String actualKey = keyCaptor.getValue(); String actualValue = messageCaptor.getValue(); @@ -129,12 +125,17 @@ private Observation constructObservation(Long observationUid, String obsDomainCd String filePathPrefix = "rawDataFiles/"; Observation observation = new Observation(); observation.setObservationUid(observationUid); + observation.setActUid(observationUid); + observation.setClassCd("OBS"); + observation.setMoodCd("ENV"); + observation.setLocalId("OBS10003388MA01"); observation.setObsDomainCdSt1(obsDomainCdSt1); observation.setPersonParticipations(readFileData(filePathPrefix + "PersonParticipations.json")); observation.setOrganizationParticipations(readFileData(filePathPrefix + "OrganizationParticipations.json")); observation.setMaterialParticipations(readFileData(filePathPrefix + "MaterialParticipations.json")); observation.setFollowupObservations(readFileData(filePathPrefix + "FollowupObservations.json")); observation.setParentObservations(readFileData(filePathPrefix + "ParentObservations.json")); + observation.setActIds(readFileData(filePathPrefix + "ActIds.json")); return observation; } @@ -142,24 +143,46 @@ private ObservationReporting constructObservationReporting(Long observationUid, ObservationReporting observation = new ObservationReporting(); observation.setObservationUid(observationUid); observation.setObsDomainCdSt1(obsDomainCdSt1); + observation.setActUid(observationUid); + observation.setClassCd("OBS"); + observation.setMoodCd("ENV"); + observation.setLocalId("OBS10003388MA01"); observation.setOrderingPersonId(10000055L); observation.setPatientId(10000066L); observation.setPerformingOrganizationId(null); // not null when obsDomainCdSt1=Result observation.setAuthorOrganizationId(34567890L); // null when obsDomainCdSt1=Result observation.setOrderingOrganizationId(23456789L); // null when obsDomainCdSt1=Result + observation.setHealthCareId(56789012L); // null when obsDomainCdSt1=Result + observation.setMorbHospReporterId(67890123L); // null when obsDomainCdSt1=Result + observation.setMorbHospId(78901234L); // null when obsDomainCdSt1=Result observation.setMaterialId(10000005L); observation.setResultObservationUid("56789012,56789013"); observation.setFollowupObservationUid("56789014,56789015"); observation.setReportObservationUid(observationUid); observation.setReportRefrUid(234567899L); observation.setReportSprtUid(234567888L); - return observation; - } - private ObservationService getObservationService(String inputTopicName, String outputTopicName) { - ObservationService observationService = new ObservationService(observationRepository, kafkaTemplate, transformer); - observationService.setObservationTopic(inputTopicName); - observationService.setObservationTopicOutputReporting(outputTopicName); - return observationService; + observation.setAssistantInterpreterId(10000077L); + observation.setAssistantInterpreterVal("22582"); + observation.setAssistantInterpreterFirstNm("Cara"); + observation.setAssistantInterpreterLastNm("Dune"); + observation.setAssistantInterpreterIdAssignAuth("22D7377772"); + observation.setAssistantInterpreterAuthType("Employee number"); + + observation.setTranscriptionistId(10000088L); + observation.setTranscriptionistVal("34344355455144"); + observation.setTranscriptionistFirstNm("Moff"); + observation.setTranscriptionistLastNm("Gideon"); + observation.setTranscriptionistIdAssignAuth("18D8181818"); + observation.setTranscriptionistAuthType("Employee number"); + + observation.setResultInterpreterId(10000022L); + observation.setLabTestTechnicianId(10000011L); + + observation.setSpecimenCollectorId(10000033L); + observation.setCopyToProviderId(10000044L); + observation.setAccessionNumber("20120601114"); + + return observation; } } diff --git a/observation-service/src/test/resources/rawDataFiles/ActIds.json b/observation-service/src/test/resources/rawDataFiles/ActIds.json new file mode 100644 index 00000000..122dad6f --- /dev/null +++ b/observation-service/src/test/resources/rawDataFiles/ActIds.json @@ -0,0 +1,20 @@ +[ + { + "id": 10361979, + "act_id_seq": 1, + "record_status": "ACTIVE", + "root_extension_txt": "20240829163135", + "type_cd": "MCID", + "type_desc_txt": "Message Control ID", + "act_last_change_time": null + }, + { + "id": 10361979, + "act_id_seq": 2, + "record_status": "ACTIVE", + "root_extension_txt": "20120601114", + "type_cd": "FN", + "type_desc_txt": "Filler Number", + "act_last_change_time": null + } +] \ No newline at end of file diff --git a/observation-service/src/test/resources/rawDataFiles/OrganizationParticipations.json b/observation-service/src/test/resources/rawDataFiles/OrganizationParticipations.json index 892030a6..dbfc6fe7 100644 --- a/observation-service/src/test/resources/rawDataFiles/OrganizationParticipations.json +++ b/observation-service/src/test/resources/rawDataFiles/OrganizationParticipations.json @@ -31,5 +31,38 @@ "last_change_time": "2024-01-01T00:00:00.000", "name": "Mount-Auburn Health", "org_last_change_time": "2011-09-13T00:00:00.000" + }, + { + "act_uid": 10000003, + "type_cd": "HCFAC", + "entity_id": 56789012, + "subject_class_cd": "ORG", + "record_status": "ACTIVE", + "type_desc_txt": "Health Care Fraud and Abuse Control", + "last_change_time": null, + "name": "ACL", + "org_last_change_time": "2011-09-13T00:00:00.000" + }, + { + "act_uid": 10000003, + "type_cd": "ReporterOfMorbReport", + "entity_id": 67890123, + "subject_class_cd": "ORG", + "record_status": "ACTIVE", + "type_desc_txt": "Reporter Of Morbidity Report", + "last_change_time": "2011-09-13T00:00:00.000", + "name": "Emory University Hospital", + "org_last_change_time": "2011-09-13T00:00:00.000" + }, + { + "act_uid": 10000003, + "type_cd": "HospOfMorbObs", + "entity_id": 78901234, + "subject_class_cd": "ORG", + "record_status": "ACTIVE", + "type_desc_txt": "Hospice Of Morbidity Observation", + "last_change_time": "2011-09-13T00:00:00.000", + "name": "HSL", + "org_last_change_time": "2011-09-13T00:00:00.000" } ] diff --git a/observation-service/src/test/resources/rawDataFiles/ParentObservations.json b/observation-service/src/test/resources/rawDataFiles/ParentObservations.json index c06129a9..f0889119 100644 --- a/observation-service/src/test/resources/rawDataFiles/ParentObservations.json +++ b/observation-service/src/test/resources/rawDataFiles/ParentObservations.json @@ -1,7 +1,7 @@ [ { "parent_type_cd": "SPRT", - "report_observation_uid": 123456789, + "observation_uid": 123456789, "parent_uid": 234567888, "parent_cd": "634-6", "parent_cd_desc_txt": "Bacteria identified in Unspecified specimen by Aerobe culture", @@ -9,7 +9,7 @@ }, { "parent_type_cd": "REFR", - "report_observation_uid": 123456789, + "observation_uid": 123456789, "parent_uid": 234567899, "parent_cd": "634-6", "parent_cd_desc_txt": "Bacteria identified in Unspecified specimen by Aerobe culture", diff --git a/observation-service/src/test/resources/rawDataFiles/PersonParticipations.json b/observation-service/src/test/resources/rawDataFiles/PersonParticipations.json index 9c7ddb6c..b70217ac 100644 --- a/observation-service/src/test/resources/rawDataFiles/PersonParticipations.json +++ b/observation-service/src/test/resources/rawDataFiles/PersonParticipations.json @@ -7,15 +7,20 @@ "participation_record_status": "ACTIVE", "participation_last_change_time": "2024-01-25T00:00:00.000", "type_desc_txt": "Orderer", - "first_name": "Din", - "last_name": "Djarin", - "local_id": "PSN10000000AL01", - "birth_time": null, - "curr_sex_cd": null, "person_cd": "PRV", "person_parent_uid": 12345678, "person_record_status": "ACTIVE", - "person_last_chg_time": "2009-11-01T00:00:00.000" + "person_last_chg_time": "2009-11-01T00:00:00.000", + "person_id_val": "8900111", + "person_id_type": "PRN", + "person_id_assign_auth_cd": "18D8181818", + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": "Provider Number", + "last_nm": "Djarin", + "first_nm": "Din", + "role_cd": "OP", + "role_subject_class_cd": "PRV", + "role_scoping_class_cd": "PAT" }, { "act_uid": 10000003, @@ -25,14 +30,157 @@ "participation_record_status": "ACTIVE", "participation_last_change_time": null, "type_desc_txt": "Patient Subject", - "first_name": "Boba", - "last_name": "Fett", - "local_id": "PSN40000004AL01", - "birth_time": "1990-03-13T00:00:00", - "curr_sex_cd": "M", "person_cd": "PAT", "person_parent_uid": 23456789, "person_record_status": "ACTIVE", - "person_last_chg_time": "2024-01-01T00:00:00.000" + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "222-33-4444", + "person_id_type": "SS", + "person_id_assign_auth_cd": "18D8181818", + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": "Social Security", + "last_nm": "Fett", + "first_nm": "Boba", + "role_cd": "PAT", + "role_subject_class_cd": "PATIENT", + "role_scoping_class_cd": null + }, + { + "act_uid": 10000003, + "type_cd": "ASS", + "entity_id": 10000077, + "subject_class_cd": "PSN", + "participation_record_status": "ACTIVE", + "participation_last_change_time": "2024-01-01T00:00:00.000", + "type_desc_txt": "Assistant", + "person_cd": "PRV", + "person_parent_uid": 10361973, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "22582", + "person_id_type": "EI", + "person_id_assign_auth_cd": "22D7377772", + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": "Employee number", + "last_nm": "Dune", + "first_nm": "Cara", + "role_cd": "LABP", + "role_subject_class_cd": "PRV", + "role_scoping_class_cd": "PAT" + }, + { + "act_uid": 10000003, + "type_cd": "ENT", + "entity_id": 10000088, + "subject_class_cd": "PSN", + "participation_record_status": "ACTIVE", + "participation_last_change_time": null, + "type_desc_txt": "Enterer", + "person_cd": "PRV", + "person_parent_uid": 23456789, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "34344355455144", + "person_id_type": "EI", + "person_id_assign_auth_cd": "18D8181818", + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": "Employee number", + "last_nm": "Gideon", + "first_nm": "Moff", + "role_cd": "LABP", + "role_subject_class_cd": "PATIENT", + "role_scoping_class_cd": null + }, + { + "act_uid": 10000003, + "type_cd": "PRF", + "entity_id": 10000011, + "subject_class_cd": "PSN", + "participation_record_status": "ACTIVE", + "participation_last_change_time": "2024-01-01T00:00:00.000", + "type_desc_txt": "Performer", + "person_cd": "PRV", + "person_parent_uid": 10361974, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "44", + "person_id_type": "EI", + "person_id_assign_auth_cd": "22D7377772", + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": "Employee number", + "last_nm": "Pershing", + "first_nm": "Penn", + "role_cd": "LABP", + "role_subject_class_cd": "PRV", + "role_scoping_class_cd": "PAT" + }, + { + "act_uid": 10000003, + "type_cd": "VRF", + "entity_id": 10000022, + "subject_class_cd": "PSN", + "participation_record_status": "ACTIVE", + "participation_last_change_time": "2024-01-01T00:00:00.000", + "type_desc_txt": "Verifier", + "person_cd": "PRV", + "person_parent_uid": 10361973, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "22582", + "person_id_type": "EI", + "person_id_assign_auth_cd": "22D7377772", + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": "Employee number", + "last_nm": "Kane", + "first_nm": "Elia", + "role_cd": "LABP", + "role_subject_class_cd": "PRV", + "role_scoping_class_cd": "PAT" + }, + { + "act_uid": 10000003, + "type_cd": "PATSBJ", + "entity_id": 10000033, + "subject_class_cd": "", + "participation_record_status": "ACTIVE", + "participation_last_change_time": null, + "type_desc_txt": "Patient Subject", + "person_cd": "PAT", + "person_parent_uid": 23456789, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "222-33-4444", + "person_id_type": "SS", + "person_id_assign_auth_cd": "18D8181818", + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": "Social Security", + "last_nm": "Fett", + "first_nm": "Boba", + "role_cd": "SPP", + "role_subject_class_cd": "PROV", + "role_scoping_class_cd": "PSN" + }, + { + "act_uid": 10000003, + "type_cd": "PATSBJ", + "entity_id": 10000044, + "subject_class_cd": "", + "participation_record_status": "ACTIVE", + "participation_last_change_time": null, + "type_desc_txt": "Patient Subject", + "person_cd": "PAT", + "person_parent_uid": 23456789, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "222-33-4444", + "person_id_type": "SS", + "person_id_assign_auth_cd": "18D8181818", + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": "Social Security", + "last_nm": "Fett", + "first_nm": "Boba", + "role_cd": "CT", + "role_subject_class_cd": "PROV", + "role_scoping_class_cd": null } ] \ No newline at end of file diff --git a/observation-service/src/test/resources/rawDataFiles/PersonParticipationsMorb.json b/observation-service/src/test/resources/rawDataFiles/PersonParticipationsMorb.json new file mode 100644 index 00000000..3b7c7b4b --- /dev/null +++ b/observation-service/src/test/resources/rawDataFiles/PersonParticipationsMorb.json @@ -0,0 +1,71 @@ +[ + { + "act_uid": 10000003, + "type_cd": "PhysicianOfMorb", + "entity_id": 10000033, + "subject_class_cd": "PSN", + "participation_record_status": "ACTIVE", + "participation_last_change_time": "2024-01-01T00:00:00.000", + "type_desc_txt": "Physician Of Morbidity Report", + "person_cd": "PRV", + "person_parent_uid": 10003010, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "2", + "person_id_type": "QEC", + "person_id_assign_auth_cd": null, + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": null, + "last_nm": "Kripke", + "first_nm": "Barry", + "role_cd": "NRS", + "role_subject_class_cd": "PROV", + "role_scoping_class_cd": null + }, + { + "act_uid": 10000003, + "type_cd": "ReporterOfMorbReport", + "entity_id": 10000044, + "subject_class_cd": "PSN", + "participation_record_status": "ACTIVE", + "participation_last_change_time": "2024-01-01T00:00:00.000", + "type_desc_txt": "Reporter Of Morbidity Report", + "person_cd": "PRV", + "person_parent_uid": 10003013, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": "3", + "person_id_type": "QEC", + "person_id_assign_auth_cd": null, + "entity_record_status_cd": "ACTIVE", + "person_id_type_desc": null, + "last_nm": "Winkle", + "first_nm": "Leslie", + "role_cd": "HO", + "role_subject_class_cd": "PROV", + "role_scoping_class_cd": null + }, + { + "act_uid": 10000003, + "type_cd": "SubjOfMorbReport", + "entity_id": 10000055, + "subject_class_cd": "PSN", + "participation_record_status": "ACTIVE", + "participation_last_change_time": "2024-01-01T00:00:00.000", + "type_desc_txt": "Subject Of Morbidity Report", + "person_cd": "PAT", + "person_parent_uid": 10362019, + "person_record_status": "ACTIVE", + "person_last_chg_time": "2024-01-01T00:00:00.000", + "person_id_val": null, + "person_id_type": null, + "person_id_assign_auth_cd": null, + "entity_record_status_cd": null, + "person_id_type_desc": null, + "last_nm": "Tran", + "first_nm": "Cali", + "role_cd": null, + "role_subject_class_cd": null, + "role_scoping_class_cd": null + } +] \ No newline at end of file