diff --git a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/model/dto/LdfDataKey.java b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/model/dto/LdfDataKey.java index 1b988238..ff7c6871 100644 --- a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/model/dto/LdfDataKey.java +++ b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/model/dto/LdfDataKey.java @@ -11,7 +11,4 @@ public class LdfDataKey { @NonNull @JsonProperty("ldf_uid") private Long ldfUid; - - @JsonProperty("business_object_uid") - private Long businessObjectUid; } diff --git a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/service/LdfDataService.java b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/service/LdfDataService.java index 8bb625ee..99d40360 100644 --- a/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/service/LdfDataService.java +++ b/ldfdata-service/src/main/java/gov/cdc/etldatapipeline/ldfdata/service/LdfDataService.java @@ -92,7 +92,6 @@ public String processLdfData(String value) { Optional ldfData = ldfDataRepository.computeLdfData(busObjNm, ldfUid, busObjUid); if (ldfData.isPresent()) { ldfDataKey.setLdfUid(Long.valueOf(ldfUid)); - ldfDataKey.setBusinessObjectUid(Long.valueOf(busObjUid)); pushKeyValuePairToKafka(ldfDataKey, ldfData.get(), ldfDataTopicReporting); return objectMapper.writeValueAsString(ldfData.get()); } diff --git a/ldfdata-service/src/test/java/gov/cdc/etldatapipeline/ldfdata/service/LdfDataServiceTest.java b/ldfdata-service/src/test/java/gov/cdc/etldatapipeline/ldfdata/service/LdfDataServiceTest.java index 005abf66..642cacfe 100644 --- a/ldfdata-service/src/test/java/gov/cdc/etldatapipeline/ldfdata/service/LdfDataServiceTest.java +++ b/ldfdata-service/src/test/java/gov/cdc/etldatapipeline/ldfdata/service/LdfDataServiceTest.java @@ -5,6 +5,7 @@ import gov.cdc.etldatapipeline.ldfdata.repository.LdfDataRepository; import gov.cdc.etldatapipeline.ldfdata.model.dto.LdfData; import gov.cdc.etldatapipeline.ldfdata.model.dto.LdfDataKey; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -35,11 +36,17 @@ class LdfDataServiceTest { @Captor private ArgumentCaptor messageCaptor; + private AutoCloseable closeable; private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl(); @BeforeEach void setUp() { - MockitoAnnotations.openMocks(this); + closeable=MockitoAnnotations.openMocks(this); + } + + @AfterEach + void tearDown() throws Exception { + closeable.close(); } @Test @@ -100,7 +107,6 @@ private void validateData(String inputTopicName, String outputTopicName, LdfDataKey ldfDataKey = new LdfDataKey(); ldfDataKey.setLdfUid(ldfData.getLdfUid()); - ldfDataKey.setBusinessObjectUid(ldfData.getBusinessObjectUid()); String expectedKey = jsonGenerator.generateStringJson(ldfDataKey); String expectedValue = jsonGenerator.generateStringJson(ldfData); @@ -110,7 +116,6 @@ private void validateData(String inputTopicName, String outputTopicName, assertEquals(expectedKey, keyCaptor.getValue()); assertEquals(expectedValue, messageCaptor.getValue()); assertTrue(keyCaptor.getValue().contains(String.valueOf(ldfDataKey.getLdfUid()))); - assertTrue(keyCaptor.getValue().contains(String.valueOf(ldfDataKey.getBusinessObjectUid()))); } private LdfDataService getInvestigationService(String inputTopicName, String outputTopicName) { diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java index 6a3f7f91..b521f88a 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/repository/PostProcRepository.java @@ -19,4 +19,7 @@ public interface PostProcRepository extends JpaRepository { @Procedure("sp_nrt_notification_postprocessing") void executeStoredProcForNotificationIds(@Param("notificationUids") String notificationUids); + + @Procedure("sp_nrt_ldf_postprocessing") + void executeStoredProcForLdfIds(@Param("ldf_uids") String ldfUids); } diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java index 78dab696..db04f317 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java @@ -24,6 +24,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; import java.util.*; import java.util.Map.Entry; @@ -54,13 +55,14 @@ public class PostProcessingService { private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); @Getter - private enum Entity { + enum Entity { ORGANIZATION(1, "organization", "organization_uid", "sp_nrt_organization_postprocessing"), PROVIDER(2, "provider", "provider_uid", "sp_nrt_provider_postprocessing"), PATIENT(3, "patient", "patient_uid", "sp_nrt_patient_postprocessing"), INVESTIGATION(4, "investigation", "public_health_case_uid", "sp_nrt_investigation_postprocessing"), NOTIFICATIONS(5, "notifications", "notification_uid", "sp_nrt_notification_postprocessing"), - F_PAGE_CASE(0, "investigation", "public_health_case_uid", "sp_f_page_case_postprocessing"), + LDF_DATA(6, "ldf_data", "ldf_uid", "sp_nrt_ldf_postprocessing"), + F_PAGE_CASE(0, "fact page case", "public_health_case_uid", "sp_f_page_case_postprocessing"), CASE_ANSWERS(0, "case answers", "public_health_case_uid", "sp_page_builder_postprocessing"), UNKNOWN(-1, "unknown", "unknown_uid", "sp_nrt_unknown_postprocessing"); @@ -99,7 +101,8 @@ private enum Entity { "${spring.kafka.topic.organization}", "${spring.kafka.topic.patient}", "${spring.kafka.topic.provider}", - "${spring.kafka.topic.notification}" + "${spring.kafka.topic.notification}", + "${spring.kafka.topic.ldf_data}" }) public void postProcessMessage( @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @@ -135,16 +138,17 @@ public void postProcessDatamart( JsonNode payloadNode = objectMapper.readTree(payload); Datamart dmData = objectMapper.readValue(payloadNode.get(PAYLOAD).toString(), Datamart.class); + final String PAYLOAD_PFX = "For payload: " + payloadNode; if (Objects.isNull(dmData)) { - throw new NoDataException("For payload: " + payloadNode + " DataMart object is null. Skipping further processing"); + throw new NoDataException(PAYLOAD_PFX + " DataMart object is null. Skipping further processing"); } Map dmMap = new HashMap<>(); if (Objects.isNull(dmData.getPublicHealthCaseUid()) || Objects.isNull(dmData.getPatientUid())) { - throw new NoDataException("For payload: " + payloadNode + " DataMart Public Health Case/Patient Id is null. Skipping further processing"); + throw new NoDataException(PAYLOAD_PFX + " DataMart Public Health Case/Patient Id is null. Skipping further processing"); } dmMap.put(dmData.getPublicHealthCaseUid(), dmData.getPatientUid()); if (Objects.isNull(dmData.getDatamart())) { - throw new NoDataException("For payload: " + payload + " DataMart value is null. Skipping further processing"); + throw new NoDataException(PAYLOAD_PFX + " DataMart value is null. Skipping further processing"); } dmCache.computeIfAbsent(dmData.getDatamart(), k -> ConcurrentHashMap.newKeySet()).add(dmMap); } catch (NoDataException nde) { @@ -185,8 +189,8 @@ protected void processCachedIds() { ids.forEach(id -> { if (idVals.containsKey(id)) { - processId(id, idVals.get(id), - investigationRepository::executeStoredProcForPageBuilder, Entity.CASE_ANSWERS); + processTopic(keyTopic, Entity.CASE_ANSWERS, id, idVals.get(id), + investigationRepository::executeStoredProcForPageBuilder); idVals.remove(id); } }); @@ -199,6 +203,10 @@ protected void processCachedIds() { processTopic(keyTopic, entity, ids, postProcRepository::executeStoredProcForNotificationIds); break; + case LDF_DATA: + processTopic(keyTopic, entity, ids, + postProcRepository::executeStoredProcForLdfIds); + break; default: logger.warn("Unknown topic: {} cannot be processed", keyTopic); break; @@ -223,7 +231,7 @@ protected void processDatamartIds() { String patients = dmSet.stream().flatMap(m -> m.values().stream().map(String::valueOf)).collect(Collectors.joining(",")); - logger.info("Processing the {} message topic. Calling stored proc: {}('{}','{}')", dmType, + logger.info("Processing {} message topic. Calling stored proc: {}('{}','{}')", dmType, "sp_hepatitis_datamart_postprocessing", cases, patients); investigationRepository.executeStoredProcForHepDatamart(cases, patients); completeLog(); @@ -247,7 +255,7 @@ Long extractIdFromMessage(String topic, String messageKey, String payload) { } id = keyNode.get(PAYLOAD).get(entity.getUidName()).asLong(); - if (topic.contains(Entity.INVESTIGATION.getName())) { + if (entity.equals(Entity.INVESTIGATION)) { JsonNode tblNode = payloadNode.get(PAYLOAD).get("rdb_table_name_list"); if (tblNode != null && !tblNode.isNull()) { idVals.put(id, tblNode.asText()); @@ -284,20 +292,20 @@ private List processTopic(String keyTopic, Entity entity, List ids, return result; } + private void processTopic(String keyTopic, Entity entity, Long id, String vals, BiConsumer repositoryMethod) { + logger.info("Processing {} for topic: {}. Calling stored proc: {} '{}', '{}'", StringUtils.capitalize(entity.getName()), keyTopic, + entity.getStoredProcedure(), id, vals); + repositoryMethod.accept(id, vals); + completeLog(); + } + private String prepareAndLog(String keyTopic, Entity entity, List ids) { String idsString = ids.stream().map(String::valueOf).collect(Collectors.joining(",")); - logger.info("Processing the {} message topic: {}. Calling stored proc: {} '{}'", entity.getName(), keyTopic, + logger.info("Processing {} for topic: {}. Calling stored proc: {} '{}'", StringUtils.capitalize(entity.getName()), keyTopic, entity.getStoredProcedure(), idsString); return idsString; } - private void processId(Long id, String vals, BiConsumer repositoryMethod, Entity entity) { - logger.info("Processing PHC ID for {}. Calling stored proc: {} '{}', '{}'", entity.getName(), - entity.getStoredProcedure(), id, vals); - repositoryMethod.accept(id, vals); - completeLog(); - } - private void completeLog() { logger.info(SP_EXECUTION_COMPLETED); } diff --git a/post-processing-service/src/main/resources/application.yaml b/post-processing-service/src/main/resources/application.yaml index 359cfe10..270c8262 100644 --- a/post-processing-service/src/main/resources/application.yaml +++ b/post-processing-service/src/main/resources/application.yaml @@ -7,6 +7,7 @@ spring: patient: nrt_patient provider: nrt_provider notification: nrt_notifications + ldf_data: nrt_ldf_data datamart: nbs_Datamart dlq: retry-suffix: -retry diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java index dd7cd87b..dd6ad4e2 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java @@ -72,6 +72,7 @@ public void tearDown() throws Exception { }) void testExtractIdFromMessage(String topic, String messageKey, Long expectedId) { Long extractedId = postProcessingServiceMock.extractIdFromMessage(topic, messageKey, messageKey); + assertNotNull(extractedId); assertEquals(expectedId, extractedId); } @@ -81,6 +82,8 @@ void testPostProcessPatientMessage() { String key = "{\"payload\":{\"patient_uid\":123}}"; postProcessingServiceMock.postProcessMessage(topic, key, key); + assertEquals(123L, postProcessingServiceMock.idCache.get(topic).get(0)); + postProcessingServiceMock.processCachedIds(); String expectedPatientIdsString = "123"; @@ -141,6 +144,7 @@ void testPostProcessInvestigationMessage() { List logs = listAppender.list; assertEquals(5, logs.size()); + assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.INVESTIGATION.getStoredProcedure())); assertTrue(logs.get(4).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @@ -158,6 +162,7 @@ void testPostProcessNotificationMessage() { List logs = listAppender.list; assertEquals(3, logs.size()); + assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.NOTIFICATIONS.getStoredProcedure())); assertTrue(logs.get(2).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @@ -185,6 +190,24 @@ void testPostProcessPageBuilder() { assertTrue(logs.get(6).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } + @Test + void testPostProcessLdfData() { + String topic = "dummy_ldf_data"; + String key = "{\"payload\":{\"ldf_uid\":123}}"; + + postProcessingServiceMock.postProcessMessage(topic, key, key); + postProcessingServiceMock.processCachedIds(); + + String expectedLdfIdsString = "123"; + verify(postProcRepositoryMock).executeStoredProcForLdfIds(expectedLdfIdsString); + assertTrue(postProcessingServiceMock.idCache.containsKey(topic)); + + List logs = listAppender.list; + assertEquals(3, logs.size()); + assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.LDF_DATA.getStoredProcedure())); + assertTrue(logs.get(2).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + } + @Test void testPostProcessMultipleMessages() { String orgKey1 = "{\"payload\":{\"organization_uid\":123}}"; @@ -225,31 +248,34 @@ void testPostProcessCacheIdsPriority() { String patientKey = "{\"payload\":{\"patient_uid\":125}}"; String investigationKey = "{\"payload\":{\"public_health_case_uid\":126}}"; String notificationKey = "{\"payload\":{\"notification_uid\":127}}"; + String ldfKey = "{\"payload\":{\"ldf_uid\":127}}"; String orgTopic = "dummy_organization"; String providerTopic = "dummy_provider"; String patientTopic = "dummy_patient"; String invTopic = "dummy_investigation"; - String notfTopic = "dummy_notifications"; + String ntfTopic = "dummy_notifications"; + String ldfTopic = "dummy_ldf_data"; postProcessingServiceMock.postProcessMessage(invTopic, investigationKey, investigationKey); postProcessingServiceMock.postProcessMessage(providerTopic, providerKey, providerKey); postProcessingServiceMock.postProcessMessage(patientTopic, patientKey, patientKey); - postProcessingServiceMock.postProcessMessage(notfTopic, notificationKey, notificationKey); + postProcessingServiceMock.postProcessMessage(ntfTopic, notificationKey, notificationKey); postProcessingServiceMock.postProcessMessage(orgTopic, orgKey, orgKey); + postProcessingServiceMock.postProcessMessage(ldfTopic, ldfKey, ldfKey); postProcessingServiceMock.processCachedIds(); List logs = listAppender.list; - assertEquals(17, logs.size()); - List topicLogList = logs.stream().map(ILoggingEvent::getFormattedMessage).filter(m -> m.contains( - "message topic")).toList(); + List topicLogList = logs.stream().map(ILoggingEvent::getFormattedMessage).filter(m -> m.matches( + "Processing .+ for topic: .*")).toList(); assertTrue(topicLogList.get(0).contains(orgTopic)); assertTrue(topicLogList.get(1).contains(providerTopic)); assertTrue(topicLogList.get(2).contains(patientTopic)); assertTrue(topicLogList.get(3).contains(invTopic)); assertTrue(topicLogList.get(4).contains(invTopic)); - assertTrue(topicLogList.get(5).contains(notfTopic)); + assertTrue(topicLogList.get(5).contains(ntfTopic)); + assertTrue(topicLogList.get(6).contains(ldfTopic)); } @Test