Skip to content

Commit

Permalink
RTR investigation service - add PHC datamart feature flag (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
sveselev authored Aug 27, 2024
1 parent f8bf154 commit 606aa9e
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class InvestigationService {
@Value("${spring.kafka.output.topic-name-reporting}")
public String investigationTopicReporting;

@Value("${service.phc-datamart-enable}")
public boolean phcDatamartEnable;

private final InvestigationRepository investigationRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final ProcessInvestigationDataUtil processDataUtil;
Expand Down Expand Up @@ -90,7 +93,9 @@ public String processInvestigation(String value) {
publicHealthCaseUid = payloadNode.get("public_health_case_uid").asText();
investigationKey.setPublicHealthCaseUid(Long.valueOf(publicHealthCaseUid));

processPhcFactDatamart(publicHealthCaseUid);
if (phcDatamartEnable) {
processPhcFactDatamart(publicHealthCaseUid);
}

logger.debug(topicDebugLog, publicHealthCaseUid, investigationTopic);
Optional<Investigation> investigationData = investigationRepository.computeInvestigations(publicHealthCaseUid);
Expand Down
2 changes: 2 additions & 0 deletions investigation-service/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ logging:
file:
name: InvestigationService.log
path: ${DA_LOG_PATH:logs}
service:
phc-datamart-enable: ${PHC_DM_ENABLE:false}
server:
port: '8093'
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
CREATE PROCEDURE [dbo].[sp_investigation_event] @phc_id_list nvarchar(max)
CREATE OR ALTER PROCEDURE [dbo].[sp_investigation_event] @phc_id_list nvarchar(max)
AS
BEGIN

BEGIN TRY

DECLARE @batch_id BIGINT;

/*NBS case answer section
* TODO: Bring in null rows*/

SET @batch_id = cast((format(getdate(),'yyMMddHHmmss')) as bigint);
INSERT INTO [rdb_modern].[dbo].[job_flow_log]
( batch_id
Expand Down Expand Up @@ -179,10 +182,11 @@ BEGIN
results.coinfection_id,
results.contact_inv_txt,
pac.prog_area_desc_txt program_area_description,
notification.local_id notification_local_id,
notification.add_time notification_add_time,
notification.record_status_cd notification_record_status_cd,
notification.last_chg_time notification_last_chg_time,
notification.notification_uid,
notification.notification_local_id,
notification.notification_add_time,
notification.notification_record_status_cd,
notification.notification_last_chg_time,
cm.case_management_uid,
investigation_act_entity.nac_page_case_uid,
investigation_act_entity.nac_last_chg_time,
Expand Down Expand Up @@ -564,8 +568,19 @@ BEGIN
WHERE
phc.public_health_case_uid in (SELECT value FROM STRING_SPLIT(@phc_id_list
, ','))) AS results
JOIN act_relationship act_rel WITH (NOLOCK) on act_rel.target_act_uid = results.public_health_case_uid AND act_rel.target_class_cd = 'CASE'
JOIN notification WITH (NOLOCK) on act_rel.source_act_uid = notification.notification_uid AND act_rel.source_class_cd = 'NOTF'
LEFT JOIN
(SELECT DISTINCT act_rel.target_act_uid,
notification.notification_uid,
notification.local_id notification_local_id,
notification.add_time notification_add_time,
notification.record_status_cd notification_record_status_cd,
notification.last_chg_time notification_last_chg_time
from act_relationship act_rel WITH (NOLOCK)
INNER JOIN notification WITH (NOLOCK) on act_rel.source_act_uid = notification.notification_uid AND act_rel.source_class_cd = 'NOTF'
where act_rel.target_class_cd = 'CASE'
group by target_act_uid,notification_uid, local_id, notification.add_time, notification.record_status_cd,notification.last_chg_time
) as notification
on notification.target_act_uid = results.public_health_case_uid
LEFT JOIN nbs_srte.dbo.jurisdiction_code jc WITH (NOLOCK) ON results.jurisdiction_cd = jc.code
LEFT JOIN act WITH (NOLOCK) ON act.act_uid = results.public_health_case_uid
LEFT JOIN nbs_srte.dbo.program_area_code pac WITH (NOLOCK) on results.prog_area_cd = pac.prog_area_cd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public void postProcessMessage(
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Payload String payload) {
Long id = extractIdFromMessage(topic, key, payload);
logger.info("Adding id to cache: {} for topic: {}", id, topic);
if (id != null) {
idCache.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()).add(id);
}
Expand Down Expand Up @@ -172,10 +171,9 @@ protected void processCachedIds() {
List<Long> ids = entry.getValue();
idCache.put(keyTopic, new ArrayList<>());

logger.info("Processing {} ids from topic: {}", ids.size(), keyTopic);
logger.info("Processing {} id(s) from topic: {}", ids.size(), keyTopic);

Entity entity = getEntityByTopic(keyTopic);
logger.info("{} data is about to be processed...", entity.getName());
switch (entity) {
case ORGANIZATION:
processTopic(keyTopic, entity, ids,
Expand Down Expand Up @@ -277,7 +275,7 @@ Long extractIdFromMessage(String topic, String messageKey, String payload) {
private Entity getEntityByTopic(String topic) {
return Arrays.stream(Entity.values())
.filter(entity -> entity.getPriority() > 0)
.filter(entity -> topic.contains(entity.getName()))
.filter(entity -> topic.endsWith(entity.getName()))
.findFirst()
.orElse(Entity.UNKNOWN);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ void testPostProcessPatientMessage() {
assertTrue(postProcessingServiceMock.idCache.containsKey(topic));

List<ILoggingEvent> logs = listAppender.list;
assertEquals(6, logs.size());
assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
assertEquals(4, logs.size());
assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
Expand All @@ -108,8 +108,8 @@ void testPostProcessProviderMessage() {
assertTrue(postProcessingServiceMock.idCache.containsKey(topic));

List<ILoggingEvent> logs = listAppender.list;
assertEquals(6, logs.size());
assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
assertEquals(4, logs.size());
assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
Expand All @@ -125,8 +125,8 @@ void testPostProcessOrganizationMessage() {
assertTrue(postProcessingServiceMock.idCache.containsKey(topic));

List<ILoggingEvent> logs = listAppender.list;
assertEquals(6, logs.size());
assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
assertEquals(4, logs.size());
assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
Expand All @@ -143,9 +143,9 @@ void testPostProcessInvestigationMessage() {
assertTrue(postProcessingServiceMock.idCache.containsKey(topic));

List<ILoggingEvent> logs = listAppender.list;
assertEquals(8, logs.size());
assertTrue(logs.get(4).getFormattedMessage().contains(PostProcessingService.Entity.INVESTIGATION.getStoredProcedure()));
assertTrue(logs.get(7).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
assertEquals(6, logs.size());
assertTrue(logs.get(2).getFormattedMessage().contains(PostProcessingService.Entity.INVESTIGATION.getStoredProcedure()));
assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
Expand All @@ -161,9 +161,9 @@ void testPostProcessNotificationMessage() {
assertTrue(postProcessingServiceMock.idCache.containsKey(topic));

List<ILoggingEvent> logs = listAppender.list;
assertEquals(6, logs.size());
assertTrue(logs.get(4).getFormattedMessage().contains(PostProcessingService.Entity.NOTIFICATION.getStoredProcedure()));
assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
assertEquals(4, logs.size());
assertTrue(logs.get(2).getFormattedMessage().contains(PostProcessingService.Entity.NOTIFICATION.getStoredProcedure()));
assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
Expand All @@ -186,8 +186,8 @@ void testPostProcessPageBuilder() {
expectedRdbTableNames);

List<ILoggingEvent> logs = listAppender.list;
assertEquals(10, logs.size());
assertTrue(logs.get(9).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
assertEquals(8, logs.size());
assertTrue(logs.get(7).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
Expand All @@ -203,9 +203,9 @@ void testPostProcessLdfData() {
assertTrue(postProcessingServiceMock.idCache.containsKey(topic));

List<ILoggingEvent> logs = listAppender.list;
assertEquals(6, logs.size());
assertTrue(logs.get(4).getFormattedMessage().contains(PostProcessingService.Entity.LDF_DATA.getStoredProcedure()));
assertTrue(logs.get(5).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
assertEquals(4, logs.size());
assertTrue(logs.get(2).getFormattedMessage().contains(PostProcessingService.Entity.LDF_DATA.getStoredProcedure()));
assertTrue(logs.get(3).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
Expand Down Expand Up @@ -338,7 +338,7 @@ void testPostProcessPageBuilderWithNoTableCache() {

verify(investigationRepositoryMock, never()).executeStoredProcForPageBuilder(anyLong(), anyString());
List<ILoggingEvent> logs = listAppender.list;
assertEquals(8, logs.size());
assertEquals(6, logs.size());
}

@Test
Expand Down

0 comments on commit 606aa9e

Please sign in to comment.