From 83cbfa3d6d6a60670430ee35b6a008ce72d9541e Mon Sep 17 00:00:00 2001 From: Jayasudha Sundaram Date: Fri, 17 Jan 2025 12:43:08 -0500 Subject: [PATCH] CNDE-2055: Post-process service integration for RDB D_PLACE (#134) --- .../repository/PostProcRepository.java | 3 + .../service/PostProcessingService.java | 22 +++--- .../src/main/resources/application.yaml | 1 + .../service/PostProcessingServiceTest.java | 69 +++++++++++++++++-- 4 files changed, 79 insertions(+), 16 deletions(-) diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java index f970f95a..ac58f0de 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java @@ -40,4 +40,7 @@ public interface PostProcRepository extends JpaRepository { @Procedure("sp_f_interview_case_postprocessing") void executeStoredProcForFInterviewCase(@Param("interviewUids") String interviewUids); + + @Procedure("sp_nrt_place_postprocessing") + void executeStoredProcForDPlace(@Param("placeUids") String placeUids); } diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java index f5004452..98a903ad 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java @@ -66,12 +66,13 @@ enum Entity { ORGANIZATION(1, "organization", "organization_uid", "sp_nrt_organization_postprocessing"), PROVIDER(2, "provider", "provider_uid", "sp_nrt_provider_postprocessing"), PATIENT(3, "patient", "patient_uid", "sp_nrt_patient_postprocessing"), - INVESTIGATION(4, "investigation", PHC_UID, "sp_nrt_investigation_postprocessing"), - NOTIFICATION(5, "notification", "notification_uid", "sp_nrt_notification_postprocessing"), - INTERVIEW(6, "interview", "interview_uid", "sp_d_interview_postprocessing"), - CASE_MANAGEMENT(7, "case_management", PHC_UID, "sp_nrt_case_management_postprocessing"), - LDF_DATA(8, "ldf_data", "ldf_uid", "sp_nrt_ldf_postprocessing"), - OBSERVATION(9, "observation", "observation_uid", null), + D_PLACE(4, "place", "place_uid", "sp_nrt_place_postprocessing"), + INVESTIGATION(5, "investigation", PHC_UID, "sp_nrt_investigation_postprocessing"), + NOTIFICATION(6, "notification", "notification_uid", "sp_nrt_notification_postprocessing"), + INTERVIEW(7, "interview", "interview_uid", "sp_d_interview_postprocessing"), + CASE_MANAGEMENT(8, "case_management", PHC_UID, "sp_nrt_case_management_postprocessing"), + LDF_DATA(9, "ldf_data", "ldf_uid", "sp_nrt_ldf_postprocessing"), + OBSERVATION(10, "observation", "observation_uid", null), F_PAGE_CASE(0, "fact page case", PHC_UID, "sp_f_page_case_postprocessing"), CASE_ANSWERS(0, "case answers", PHC_UID, "sp_page_builder_postprocessing"), CASE_COUNT(0, "case count", PHC_UID, "sp_nrt_case_count_postprocessing"), @@ -118,7 +119,8 @@ enum Entity { "${spring.kafka.topic.case_management}", "${spring.kafka.topic.interview}", "${spring.kafka.topic.ldf_data}", - "${spring.kafka.topic.observation}" + "${spring.kafka.topic.observation}", + "${spring.kafka.topic.place}" }) public void postProcessMessage( @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @@ -221,8 +223,7 @@ protected void processCachedIds() { Entity entity = getEntityByTopic(keyTopic); switch (entity) { case ORGANIZATION: - processTopic(keyTopic, entity, ids, - postProcRepository::executeStoredProcForOrganizationIds); + processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForOrganizationIds); break; case PROVIDER: processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForProviderIds); @@ -230,6 +231,9 @@ protected void processCachedIds() { case PATIENT: processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForPatientIds); break; + case D_PLACE: + processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForDPlace); + break; case INVESTIGATION: dmData = processTopic(keyTopic, entity, ids, investigationRepository::executeStoredProcForPublicHealthCaseIds); diff --git a/post-processing-service/src/main/resources/application.yaml b/post-processing-service/src/main/resources/application.yaml index 3dba28d7..e816798f 100644 --- a/post-processing-service/src/main/resources/application.yaml +++ b/post-processing-service/src/main/resources/application.yaml @@ -10,6 +10,7 @@ spring: case_management: nrt_investigation_case_management interview: nrt_interview ldf_data: nrt_ldf_data + place: nrt_place datamart: nbs_Datamart dlq: retry-suffix: -retry diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java index 4b769723..76ce4beb 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java @@ -67,6 +67,7 @@ public void tearDown() throws Exception { @CsvSource({ "dummy_patient, '{\"payload\":{\"patient_uid\":123}}', 123", "dummy_provider, '{\"payload\":{\"provider_uid\":123}}', 123", + "dummy_place, '{\"payload\":{\"place_uid\":123}}', 123", "dummy_organization, '{\"payload\":{\"organization_uid\":123}}', 123", "dummy_investigation, '{\"payload\":{\"public_health_case_uid\":123}}', 123", "dummy_notification, '{\"payload\":{\"notification_uid\":123}}', 123", @@ -345,22 +346,30 @@ void testPostProcessMultipleMessages() { String ntfKey2 = "{\"payload\":{\"notification_uid\":568}}"; String ntfTopic = "dummy_notification"; + String placeKey1 = "{\"payload\":{\"place_uid\":123}}"; + String placeKey2 = "{\"payload\":{\"place_uid\":124}}"; + String placeTopic = "dummy_place"; + postProcessingServiceMock.postProcessMessage(orgTopic, orgKey1, orgKey1); postProcessingServiceMock.postProcessMessage(orgTopic, orgKey2, orgKey2); postProcessingServiceMock.postProcessMessage(ntfTopic, ntfKey1, ntfKey1); postProcessingServiceMock.postProcessMessage(ntfTopic, ntfKey2, ntfKey2); postProcessingServiceMock.postProcessMessage(invTopic, invKey1, invKey1); postProcessingServiceMock.postProcessMessage(invTopic, invKey2, invKey2); + postProcessingServiceMock.postProcessMessage(placeTopic, placeKey1, placeKey1); + postProcessingServiceMock.postProcessMessage(placeTopic, placeKey2, placeKey2); assertTrue(postProcessingServiceMock.idCache.containsKey(orgTopic)); assertTrue(postProcessingServiceMock.idCache.containsKey(invTopic)); assertTrue(postProcessingServiceMock.idCache.containsKey(ntfTopic)); + assertTrue(postProcessingServiceMock.idCache.containsKey(placeTopic)); postProcessingServiceMock.processCachedIds(); verify(postProcRepositoryMock).executeStoredProcForOrganizationIds("123,124"); verify(investigationRepositoryMock).executeStoredProcForPublicHealthCaseIds("234,235"); verify(investigationRepositoryMock).executeStoredProcForNotificationIds("567,568"); + verify(postProcRepositoryMock).executeStoredProcForDPlace("123,124"); } @Test @@ -368,18 +377,20 @@ void testPostProcessCacheIdsPriority() { String orgKey = "{\"payload\":{\"organization_uid\":123}}"; String providerKey = "{\"payload\":{\"provider_uid\":124}}"; String patientKey = "{\"payload\":{\"patient_uid\":125}}"; + String placeKey = "{\"payload\":{\"place_uid\":131}}"; String investigationKey = "{\"payload\":{\"public_health_case_uid\":126}}"; String notificationKey = "{\"payload\":{\"notification_uid\":127}}"; String caseManagementKey = "{\"payload\":{\"public_health_case_uid\":128,\"case_management_uid\":1001}}"; String ldfKey = "{\"payload\":{\"ldf_uid\":129}}"; String interviewKey = "{\"payload\":{\"interview_uid\":130}}"; - String observationKey = "{\"payload\":{\"observation_uid\":130}}"; String observationMsg = "{\"payload\":{\"observation_uid\":130, \"obs_domain_cd_st_1\": \"Order\",\"ctrl_cd_display_form\": \"MorbReport\"}}"; + String orgTopic = "dummy_organization"; String providerTopic = "dummy_provider"; String patientTopic = "dummy_patient"; + String placeTopic = "dummy_place"; String invTopic = "dummy_investigation"; String ntfTopic = "dummy_notification"; String intTopic = "dummy_interview"; @@ -388,9 +399,11 @@ void testPostProcessCacheIdsPriority() { String obsTopic = "dummy_observation"; + postProcessingServiceMock.postProcessMessage(invTopic, investigationKey, investigationKey); postProcessingServiceMock.postProcessMessage(providerTopic, providerKey, providerKey); postProcessingServiceMock.postProcessMessage(patientTopic, patientKey, patientKey); + postProcessingServiceMock.postProcessMessage(placeTopic, placeKey, placeKey); postProcessingServiceMock.postProcessMessage(intTopic, interviewKey, interviewKey); postProcessingServiceMock.postProcessMessage(ntfTopic, notificationKey, notificationKey); postProcessingServiceMock.postProcessMessage(orgTopic, orgKey, orgKey); @@ -406,16 +419,17 @@ void testPostProcessCacheIdsPriority() { assertTrue(topicLogList.get(0).contains(orgTopic)); assertTrue(topicLogList.get(1).contains(providerTopic)); assertTrue(topicLogList.get(2).contains(patientTopic)); - assertTrue(topicLogList.get(3).contains(invTopic)); + assertTrue(topicLogList.get(3).contains(placeTopic)); assertTrue(topicLogList.get(4).contains(invTopic)); assertTrue(topicLogList.get(5).contains(invTopic)); - assertTrue(topicLogList.get(6).contains(ntfTopic)); - assertTrue(topicLogList.get(7).contains(intTopic)); + assertTrue(topicLogList.get(6).contains(invTopic)); + assertTrue(topicLogList.get(7).contains(ntfTopic)); assertTrue(topicLogList.get(8).contains(intTopic)); - assertTrue(topicLogList.get(9).contains(cmTopic)); + assertTrue(topicLogList.get(9).contains(intTopic)); assertTrue(topicLogList.get(10).contains(cmTopic)); - assertTrue(topicLogList.get(11).contains(ldfTopic)); - assertTrue(topicLogList.get(12).contains(obsTopic)); + assertTrue(topicLogList.get(11).contains(cmTopic)); + assertTrue(topicLogList.get(12).contains(ldfTopic)); + assertTrue(topicLogList.get(13).contains(obsTopic)); } @Test @@ -496,6 +510,47 @@ void testProduceDatamartTopicWithNoPatient() { verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString()); } + @Test + void testPostProcessPlaceMessage() { + String topic = "dummy_place"; + String key = "{\"payload\":{\"place_uid\":123}}"; + + postProcessingServiceMock.postProcessMessage(topic, key, key); + postProcessingServiceMock.processCachedIds(); + + String expectedPlaceIdsString = "123"; + verify(postProcRepositoryMock).executeStoredProcForDPlace(expectedPlaceIdsString); + + List logs = listAppender.list; + assertEquals(4, logs.size()); + assertTrue(logs.get(2).getFormattedMessage().contains(PostProcessingService.Entity.D_PLACE.getStoredProcedure())); + assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + } + + @Test + void testPostProcessMultipleMessages_WithPlace() { + String placeKey1 = "{\"payload\":{\"place_uid\":123}}"; + String placeKey2 = "{\"payload\":{\"place_uid\":124}}"; + String placeTopic = "dummy_place"; + + postProcessingServiceMock.postProcessMessage(placeTopic, placeKey1, placeKey1); + postProcessingServiceMock.postProcessMessage(placeTopic, placeKey2, placeKey2); + + assertTrue(postProcessingServiceMock.idCache.containsKey(placeTopic)); + + postProcessingServiceMock.processCachedIds(); + + verify(postProcRepositoryMock).executeStoredProcForDPlace("123,124"); + } + @Test + void testPostProcessNoPlaceUidException() { + String placeKey = "{\"payload\":{}}"; + String topic = "dummy_place"; + + RuntimeException ex = assertThrows(RuntimeException.class, + () -> postProcessingServiceMock.postProcessMessage(topic, placeKey, placeKey)); + assertEquals(NoSuchElementException.class, ex.getCause().getClass()); + } @ParameterizedTest @CsvSource({ "'{\"payload\":{\"public_health_case_uid\":123,\"rdb_table_name_list\":null}}'",