Skip to content

Commit

Permalink
CNDE-2055: Post-process service integration for RDB D_PLACE (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayasudhasundaram authored Jan 17, 2025
1 parent 74a3bf1 commit 83cbfa3
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ public interface PostProcRepository extends JpaRepository<PostProcSp, Long> {

@Procedure("sp_f_interview_case_postprocessing")
void executeStoredProcForFInterviewCase(@Param("interviewUids") String interviewUids);

@Procedure("sp_nrt_place_postprocessing")
void executeStoredProcForDPlace(@Param("placeUids") String placeUids);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ enum Entity {
ORGANIZATION(1, "organization", "organization_uid", "sp_nrt_organization_postprocessing"),
PROVIDER(2, "provider", "provider_uid", "sp_nrt_provider_postprocessing"),
PATIENT(3, "patient", "patient_uid", "sp_nrt_patient_postprocessing"),
INVESTIGATION(4, "investigation", PHC_UID, "sp_nrt_investigation_postprocessing"),
NOTIFICATION(5, "notification", "notification_uid", "sp_nrt_notification_postprocessing"),
INTERVIEW(6, "interview", "interview_uid", "sp_d_interview_postprocessing"),
CASE_MANAGEMENT(7, "case_management", PHC_UID, "sp_nrt_case_management_postprocessing"),
LDF_DATA(8, "ldf_data", "ldf_uid", "sp_nrt_ldf_postprocessing"),
OBSERVATION(9, "observation", "observation_uid", null),
D_PLACE(4, "place", "place_uid", "sp_nrt_place_postprocessing"),
INVESTIGATION(5, "investigation", PHC_UID, "sp_nrt_investigation_postprocessing"),
NOTIFICATION(6, "notification", "notification_uid", "sp_nrt_notification_postprocessing"),
INTERVIEW(7, "interview", "interview_uid", "sp_d_interview_postprocessing"),
CASE_MANAGEMENT(8, "case_management", PHC_UID, "sp_nrt_case_management_postprocessing"),
LDF_DATA(9, "ldf_data", "ldf_uid", "sp_nrt_ldf_postprocessing"),
OBSERVATION(10, "observation", "observation_uid", null),
F_PAGE_CASE(0, "fact page case", PHC_UID, "sp_f_page_case_postprocessing"),
CASE_ANSWERS(0, "case answers", PHC_UID, "sp_page_builder_postprocessing"),
CASE_COUNT(0, "case count", PHC_UID, "sp_nrt_case_count_postprocessing"),
Expand Down Expand Up @@ -118,7 +119,8 @@ enum Entity {
"${spring.kafka.topic.case_management}",
"${spring.kafka.topic.interview}",
"${spring.kafka.topic.ldf_data}",
"${spring.kafka.topic.observation}"
"${spring.kafka.topic.observation}",
"${spring.kafka.topic.place}"
})
public void postProcessMessage(
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Expand Down Expand Up @@ -221,15 +223,17 @@ protected void processCachedIds() {
Entity entity = getEntityByTopic(keyTopic);
switch (entity) {
case ORGANIZATION:
processTopic(keyTopic, entity, ids,
postProcRepository::executeStoredProcForOrganizationIds);
processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForOrganizationIds);
break;
case PROVIDER:
processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForProviderIds);
break;
case PATIENT:
processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForPatientIds);
break;
case D_PLACE:
processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForDPlace);
break;
case INVESTIGATION:
dmData = processTopic(keyTopic, entity, ids,
investigationRepository::executeStoredProcForPublicHealthCaseIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ spring:
case_management: nrt_investigation_case_management
interview: nrt_interview
ldf_data: nrt_ldf_data
place: nrt_place
datamart: nbs_Datamart
dlq:
retry-suffix: -retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void tearDown() throws Exception {
@CsvSource({
"dummy_patient, '{\"payload\":{\"patient_uid\":123}}', 123",
"dummy_provider, '{\"payload\":{\"provider_uid\":123}}', 123",
"dummy_place, '{\"payload\":{\"place_uid\":123}}', 123",
"dummy_organization, '{\"payload\":{\"organization_uid\":123}}', 123",
"dummy_investigation, '{\"payload\":{\"public_health_case_uid\":123}}', 123",
"dummy_notification, '{\"payload\":{\"notification_uid\":123}}', 123",
Expand Down Expand Up @@ -345,41 +346,51 @@ void testPostProcessMultipleMessages() {
String ntfKey2 = "{\"payload\":{\"notification_uid\":568}}";
String ntfTopic = "dummy_notification";

String placeKey1 = "{\"payload\":{\"place_uid\":123}}";
String placeKey2 = "{\"payload\":{\"place_uid\":124}}";
String placeTopic = "dummy_place";

postProcessingServiceMock.postProcessMessage(orgTopic, orgKey1, orgKey1);
postProcessingServiceMock.postProcessMessage(orgTopic, orgKey2, orgKey2);
postProcessingServiceMock.postProcessMessage(ntfTopic, ntfKey1, ntfKey1);
postProcessingServiceMock.postProcessMessage(ntfTopic, ntfKey2, ntfKey2);
postProcessingServiceMock.postProcessMessage(invTopic, invKey1, invKey1);
postProcessingServiceMock.postProcessMessage(invTopic, invKey2, invKey2);
postProcessingServiceMock.postProcessMessage(placeTopic, placeKey1, placeKey1);
postProcessingServiceMock.postProcessMessage(placeTopic, placeKey2, placeKey2);

assertTrue(postProcessingServiceMock.idCache.containsKey(orgTopic));
assertTrue(postProcessingServiceMock.idCache.containsKey(invTopic));
assertTrue(postProcessingServiceMock.idCache.containsKey(ntfTopic));
assertTrue(postProcessingServiceMock.idCache.containsKey(placeTopic));

postProcessingServiceMock.processCachedIds();

verify(postProcRepositoryMock).executeStoredProcForOrganizationIds("123,124");
verify(investigationRepositoryMock).executeStoredProcForPublicHealthCaseIds("234,235");
verify(investigationRepositoryMock).executeStoredProcForNotificationIds("567,568");
verify(postProcRepositoryMock).executeStoredProcForDPlace("123,124");
}

@Test
void testPostProcessCacheIdsPriority() {
String orgKey = "{\"payload\":{\"organization_uid\":123}}";
String providerKey = "{\"payload\":{\"provider_uid\":124}}";
String patientKey = "{\"payload\":{\"patient_uid\":125}}";
String placeKey = "{\"payload\":{\"place_uid\":131}}";
String investigationKey = "{\"payload\":{\"public_health_case_uid\":126}}";
String notificationKey = "{\"payload\":{\"notification_uid\":127}}";
String caseManagementKey = "{\"payload\":{\"public_health_case_uid\":128,\"case_management_uid\":1001}}";
String ldfKey = "{\"payload\":{\"ldf_uid\":129}}";
String interviewKey = "{\"payload\":{\"interview_uid\":130}}";

String observationKey = "{\"payload\":{\"observation_uid\":130}}";
String observationMsg = "{\"payload\":{\"observation_uid\":130, \"obs_domain_cd_st_1\": \"Order\",\"ctrl_cd_display_form\": \"MorbReport\"}}";


String orgTopic = "dummy_organization";
String providerTopic = "dummy_provider";
String patientTopic = "dummy_patient";
String placeTopic = "dummy_place";
String invTopic = "dummy_investigation";
String ntfTopic = "dummy_notification";
String intTopic = "dummy_interview";
Expand All @@ -388,9 +399,11 @@ void testPostProcessCacheIdsPriority() {
String obsTopic = "dummy_observation";



postProcessingServiceMock.postProcessMessage(invTopic, investigationKey, investigationKey);
postProcessingServiceMock.postProcessMessage(providerTopic, providerKey, providerKey);
postProcessingServiceMock.postProcessMessage(patientTopic, patientKey, patientKey);
postProcessingServiceMock.postProcessMessage(placeTopic, placeKey, placeKey);
postProcessingServiceMock.postProcessMessage(intTopic, interviewKey, interviewKey);
postProcessingServiceMock.postProcessMessage(ntfTopic, notificationKey, notificationKey);
postProcessingServiceMock.postProcessMessage(orgTopic, orgKey, orgKey);
Expand All @@ -406,16 +419,17 @@ void testPostProcessCacheIdsPriority() {
assertTrue(topicLogList.get(0).contains(orgTopic));
assertTrue(topicLogList.get(1).contains(providerTopic));
assertTrue(topicLogList.get(2).contains(patientTopic));
assertTrue(topicLogList.get(3).contains(invTopic));
assertTrue(topicLogList.get(3).contains(placeTopic));
assertTrue(topicLogList.get(4).contains(invTopic));
assertTrue(topicLogList.get(5).contains(invTopic));
assertTrue(topicLogList.get(6).contains(ntfTopic));
assertTrue(topicLogList.get(7).contains(intTopic));
assertTrue(topicLogList.get(6).contains(invTopic));
assertTrue(topicLogList.get(7).contains(ntfTopic));
assertTrue(topicLogList.get(8).contains(intTopic));
assertTrue(topicLogList.get(9).contains(cmTopic));
assertTrue(topicLogList.get(9).contains(intTopic));
assertTrue(topicLogList.get(10).contains(cmTopic));
assertTrue(topicLogList.get(11).contains(ldfTopic));
assertTrue(topicLogList.get(12).contains(obsTopic));
assertTrue(topicLogList.get(11).contains(cmTopic));
assertTrue(topicLogList.get(12).contains(ldfTopic));
assertTrue(topicLogList.get(13).contains(obsTopic));
}

@Test
Expand Down Expand Up @@ -496,6 +510,47 @@ void testProduceDatamartTopicWithNoPatient() {
verify(kafkaTemplate, never()).send(anyString(), anyString(), anyString());
}

@Test
void testPostProcessPlaceMessage() {
String topic = "dummy_place";
String key = "{\"payload\":{\"place_uid\":123}}";

postProcessingServiceMock.postProcessMessage(topic, key, key);
postProcessingServiceMock.processCachedIds();

String expectedPlaceIdsString = "123";
verify(postProcRepositoryMock).executeStoredProcForDPlace(expectedPlaceIdsString);

List<ILoggingEvent> logs = listAppender.list;
assertEquals(4, logs.size());
assertTrue(logs.get(2).getFormattedMessage().contains(PostProcessingService.Entity.D_PLACE.getStoredProcedure()));
assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
void testPostProcessMultipleMessages_WithPlace() {
String placeKey1 = "{\"payload\":{\"place_uid\":123}}";
String placeKey2 = "{\"payload\":{\"place_uid\":124}}";
String placeTopic = "dummy_place";

postProcessingServiceMock.postProcessMessage(placeTopic, placeKey1, placeKey1);
postProcessingServiceMock.postProcessMessage(placeTopic, placeKey2, placeKey2);

assertTrue(postProcessingServiceMock.idCache.containsKey(placeTopic));

postProcessingServiceMock.processCachedIds();

verify(postProcRepositoryMock).executeStoredProcForDPlace("123,124");
}
@Test
void testPostProcessNoPlaceUidException() {
String placeKey = "{\"payload\":{}}";
String topic = "dummy_place";

RuntimeException ex = assertThrows(RuntimeException.class,
() -> postProcessingServiceMock.postProcessMessage(topic, placeKey, placeKey));
assertEquals(NoSuchElementException.class, ex.getCause().getClass());
}
@ParameterizedTest
@CsvSource({
"'{\"payload\":{\"public_health_case_uid\":123,\"rdb_table_name_list\":null}}'",
Expand Down

0 comments on commit 83cbfa3

Please sign in to comment.