diff --git a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java index 55a2bdd1..043f7011 100644 --- a/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java +++ b/investigation-service/src/main/java/gov/cdc/etldatapipeline/investigation/service/InvestigationService.java @@ -45,6 +45,9 @@ public class InvestigationService { @Value("${spring.kafka.output.topic-name-reporting}") public String investigationTopicReporting; + @Value("${service.phc-datamart-enable}") + public boolean phcDatamartEnable; + private final InvestigationRepository investigationRepository; private final KafkaTemplate kafkaTemplate; private final ProcessInvestigationDataUtil processDataUtil; @@ -90,7 +93,9 @@ public String processInvestigation(String value) { publicHealthCaseUid = payloadNode.get("public_health_case_uid").asText(); investigationKey.setPublicHealthCaseUid(Long.valueOf(publicHealthCaseUid)); - processPhcFactDatamart(publicHealthCaseUid); + if (phcDatamartEnable) { + processPhcFactDatamart(publicHealthCaseUid); + } logger.debug(topicDebugLog, publicHealthCaseUid, investigationTopic); Optional investigationData = investigationRepository.computeInvestigations(publicHealthCaseUid); diff --git a/investigation-service/src/main/resources/application.yaml b/investigation-service/src/main/resources/application.yaml index 90ddf308..83b61a1b 100644 --- a/investigation-service/src/main/resources/application.yaml +++ b/investigation-service/src/main/resources/application.yaml @@ -38,5 +38,7 @@ logging: file: name: InvestigationService.log path: ${DA_LOG_PATH:logs} +service: + phc-datamart-enable: ${PHC_DM_ENABLE:false} server: port: '8093' \ No newline at end of file diff --git a/liquibase-service/src/main/resources/db/odse/routines/010-sp_investigation_event-001.sql b/liquibase-service/src/main/resources/db/odse/routines/010-sp_investigation_event-001.sql index be4d0e0b..c1b8747e 100644 --- a/liquibase-service/src/main/resources/db/odse/routines/010-sp_investigation_event-001.sql +++ b/liquibase-service/src/main/resources/db/odse/routines/010-sp_investigation_event-001.sql @@ -1,4 +1,4 @@ -CREATE PROCEDURE [dbo].[sp_investigation_event] @phc_id_list nvarchar(max) +CREATE OR ALTER PROCEDURE [dbo].[sp_investigation_event] @phc_id_list nvarchar(max) AS BEGIN @@ -6,6 +6,9 @@ BEGIN DECLARE @batch_id BIGINT; + /*NBS case answer section + * TODO: Bring in null rows*/ + SET @batch_id = cast((format(getdate(),'yyMMddHHmmss')) as bigint); INSERT INTO [rdb_modern].[dbo].[job_flow_log] ( batch_id @@ -179,10 +182,11 @@ BEGIN results.coinfection_id, results.contact_inv_txt, pac.prog_area_desc_txt program_area_description, - notification.local_id notification_local_id, - notification.add_time notification_add_time, - notification.record_status_cd notification_record_status_cd, - notification.last_chg_time notification_last_chg_time, + notification.notification_uid, + notification.notification_local_id, + notification.notification_add_time, + notification.notification_record_status_cd, + notification.notification_last_chg_time, cm.case_management_uid, investigation_act_entity.nac_page_case_uid, investigation_act_entity.nac_last_chg_time, @@ -564,8 +568,19 @@ BEGIN WHERE phc.public_health_case_uid in (SELECT value FROM STRING_SPLIT(@phc_id_list , ','))) AS results - JOIN act_relationship act_rel WITH (NOLOCK) on act_rel.target_act_uid = results.public_health_case_uid AND act_rel.target_class_cd = 'CASE' - JOIN notification WITH (NOLOCK) on act_rel.source_act_uid = notification.notification_uid AND act_rel.source_class_cd = 'NOTF' + LEFT JOIN + (SELECT DISTINCT act_rel.target_act_uid, + notification.notification_uid, + notification.local_id notification_local_id, + notification.add_time notification_add_time, + notification.record_status_cd notification_record_status_cd, + notification.last_chg_time notification_last_chg_time + from act_relationship act_rel WITH (NOLOCK) + INNER JOIN notification WITH (NOLOCK) on act_rel.source_act_uid = notification.notification_uid AND act_rel.source_class_cd = 'NOTF' + where act_rel.target_class_cd = 'CASE' + group by target_act_uid,notification_uid, local_id, notification.add_time, notification.record_status_cd,notification.last_chg_time + ) as notification + on notification.target_act_uid = results.public_health_case_uid LEFT JOIN nbs_srte.dbo.jurisdiction_code jc WITH (NOLOCK) ON results.jurisdiction_cd = jc.code LEFT JOIN act WITH (NOLOCK) ON act.act_uid = results.public_health_case_uid LEFT JOIN nbs_srte.dbo.program_area_code pac WITH (NOLOCK) on results.prog_area_cd = pac.prog_area_cd diff --git a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java index d3576df4..53e7a661 100644 --- a/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java +++ b/post-processing-service/src/main/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingService.java @@ -109,7 +109,6 @@ public void postProcessMessage( @Header(KafkaHeaders.RECEIVED_KEY) String key, @Payload String payload) { Long id = extractIdFromMessage(topic, key, payload); - logger.info("Adding id to cache: {} for topic: {}", id, topic); if (id != null) { idCache.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(id); } @@ -172,10 +171,9 @@ protected void processCachedIds() { List ids = entry.getValue(); idCache.put(keyTopic, new ArrayList<>()); - logger.info("Processing {} ids from topic: {}", ids.size(), keyTopic); + logger.info("Processing {} id(s) from topic: {}", ids.size(), keyTopic); Entity entity = getEntityByTopic(keyTopic); - logger.info("{} data is about to be processed...", entity.getName()); switch (entity) { case ORGANIZATION: processTopic(keyTopic, entity, ids, @@ -277,7 +275,7 @@ Long extractIdFromMessage(String topic, String messageKey, String payload) { private Entity getEntityByTopic(String topic) { return Arrays.stream(Entity.values()) .filter(entity -> entity.getPriority() > 0) - .filter(entity -> topic.contains(entity.getName())) + .filter(entity -> topic.endsWith(entity.getName())) .findFirst() .orElse(Entity.UNKNOWN); } diff --git a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java index 399c73ce..97f5f2f1 100644 --- a/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java +++ b/post-processing-service/src/test/java/gov/cdc/etldatapipeline/postprocessingservice/service/PostProcessingServiceTest.java @@ -91,8 +91,8 @@ void testPostProcessPatientMessage() { assertTrue(postProcessingServiceMock.idCache.containsKey(topic)); List logs = listAppender.list; - assertEquals(6, logs.size()); - assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + assertEquals(4, logs.size()); + assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @Test @@ -108,8 +108,8 @@ void testPostProcessProviderMessage() { assertTrue(postProcessingServiceMock.idCache.containsKey(topic)); List logs = listAppender.list; - assertEquals(6, logs.size()); - assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + assertEquals(4, logs.size()); + assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @Test @@ -125,8 +125,8 @@ void testPostProcessOrganizationMessage() { assertTrue(postProcessingServiceMock.idCache.containsKey(topic)); List logs = listAppender.list; - assertEquals(6, logs.size()); - assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + assertEquals(4, logs.size()); + assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @Test @@ -143,9 +143,9 @@ void testPostProcessInvestigationMessage() { assertTrue(postProcessingServiceMock.idCache.containsKey(topic)); List logs = listAppender.list; - assertEquals(8, logs.size()); - assertTrue(logs.get(4).getFormattedMessage().contains(PostProcessingService.Entity.INVESTIGATION.getStoredProcedure())); - assertTrue(logs.get(7).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + assertEquals(6, logs.size()); + assertTrue(logs.get(2).getFormattedMessage().contains(PostProcessingService.Entity.INVESTIGATION.getStoredProcedure())); + assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @Test @@ -161,9 +161,9 @@ void testPostProcessNotificationMessage() { assertTrue(postProcessingServiceMock.idCache.containsKey(topic)); List logs = listAppender.list; - assertEquals(6, logs.size()); - assertTrue(logs.get(4).getFormattedMessage().contains(PostProcessingService.Entity.NOTIFICATION.getStoredProcedure())); - assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + assertEquals(4, logs.size()); + assertTrue(logs.get(2).getFormattedMessage().contains(PostProcessingService.Entity.NOTIFICATION.getStoredProcedure())); + assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @Test @@ -186,8 +186,8 @@ void testPostProcessPageBuilder() { expectedRdbTableNames); List logs = listAppender.list; - assertEquals(10, logs.size()); - assertTrue(logs.get(9).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + assertEquals(8, logs.size()); + assertTrue(logs.get(7).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @Test @@ -203,9 +203,9 @@ void testPostProcessLdfData() { assertTrue(postProcessingServiceMock.idCache.containsKey(topic)); List logs = listAppender.list; - assertEquals(6, logs.size()); - assertTrue(logs.get(4).getFormattedMessage().contains(PostProcessingService.Entity.LDF_DATA.getStoredProcedure())); - assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); + assertEquals(4, logs.size()); + assertTrue(logs.get(2).getFormattedMessage().contains(PostProcessingService.Entity.LDF_DATA.getStoredProcedure())); + assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED)); } @Test @@ -338,7 +338,7 @@ void testPostProcessPageBuilderWithNoTableCache() { verify(investigationRepositoryMock, never()).executeStoredProcForPageBuilder(anyLong(), anyString()); List logs = listAppender.list; - assertEquals(8, logs.size()); + assertEquals(6, logs.size()); } @Test