Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDIT-1364: LDF post-processing service integration #16

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,4 @@ public class LdfDataKey {
@NonNull
@JsonProperty("ldf_uid")
private Long ldfUid;

@JsonProperty("business_object_uid")
private Long businessObjectUid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public String processLdfData(String value) {
Optional<LdfData> ldfData = ldfDataRepository.computeLdfData(busObjNm, ldfUid, busObjUid);
if (ldfData.isPresent()) {
ldfDataKey.setLdfUid(Long.valueOf(ldfUid));
ldfDataKey.setBusinessObjectUid(Long.valueOf(busObjUid));
pushKeyValuePairToKafka(ldfDataKey, ldfData.get(), ldfDataTopicReporting);
return objectMapper.writeValueAsString(ldfData.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gov.cdc.etldatapipeline.ldfdata.repository.LdfDataRepository;
import gov.cdc.etldatapipeline.ldfdata.model.dto.LdfData;
import gov.cdc.etldatapipeline.ldfdata.model.dto.LdfDataKey;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -35,11 +36,17 @@ class LdfDataServiceTest {
@Captor
private ArgumentCaptor<String> messageCaptor;

private AutoCloseable closeable;
private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl();

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
closeable=MockitoAnnotations.openMocks(this);
}

@AfterEach
void tearDown() throws Exception {
closeable.close();
}

@Test
Expand Down Expand Up @@ -100,7 +107,6 @@ private void validateData(String inputTopicName, String outputTopicName,

LdfDataKey ldfDataKey = new LdfDataKey();
ldfDataKey.setLdfUid(ldfData.getLdfUid());
ldfDataKey.setBusinessObjectUid(ldfData.getBusinessObjectUid());

String expectedKey = jsonGenerator.generateStringJson(ldfDataKey);
String expectedValue = jsonGenerator.generateStringJson(ldfData);
Expand All @@ -110,7 +116,6 @@ private void validateData(String inputTopicName, String outputTopicName,
assertEquals(expectedKey, keyCaptor.getValue());
assertEquals(expectedValue, messageCaptor.getValue());
assertTrue(keyCaptor.getValue().contains(String.valueOf(ldfDataKey.getLdfUid())));
assertTrue(keyCaptor.getValue().contains(String.valueOf(ldfDataKey.getBusinessObjectUid())));
}

private LdfDataService getInvestigationService(String inputTopicName, String outputTopicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ public interface PostProcRepository extends JpaRepository<PostProcSp, Long> {

@Procedure("sp_nrt_notification_postprocessing")
void executeStoredProcForNotificationIds(@Param("notificationUids") String notificationUids);

@Procedure("sp_nrt_ldf_postprocessing")
void executeStoredProcForLdfIds(@Param("ldf_uids") String ldfUids);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.*;
import java.util.Map.Entry;
Expand Down Expand Up @@ -54,13 +55,14 @@ public class PostProcessingService {
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

@Getter
private enum Entity {
enum Entity {
ORGANIZATION(1, "organization", "organization_uid", "sp_nrt_organization_postprocessing"),
PROVIDER(2, "provider", "provider_uid", "sp_nrt_provider_postprocessing"),
PATIENT(3, "patient", "patient_uid", "sp_nrt_patient_postprocessing"),
INVESTIGATION(4, "investigation", "public_health_case_uid", "sp_nrt_investigation_postprocessing"),
NOTIFICATIONS(5, "notifications", "notification_uid", "sp_nrt_notification_postprocessing"),
F_PAGE_CASE(0, "investigation", "public_health_case_uid", "sp_f_page_case_postprocessing"),
LDF_DATA(6, "ldf_data", "ldf_uid", "sp_nrt_ldf_postprocessing"),
F_PAGE_CASE(0, "fact page case", "public_health_case_uid", "sp_f_page_case_postprocessing"),
sveselev marked this conversation as resolved.
Show resolved Hide resolved
CASE_ANSWERS(0, "case answers", "public_health_case_uid", "sp_page_builder_postprocessing"),
UNKNOWN(-1, "unknown", "unknown_uid", "sp_nrt_unknown_postprocessing");

Expand Down Expand Up @@ -99,7 +101,8 @@ private enum Entity {
"${spring.kafka.topic.organization}",
"${spring.kafka.topic.patient}",
"${spring.kafka.topic.provider}",
"${spring.kafka.topic.notification}"
"${spring.kafka.topic.notification}",
"${spring.kafka.topic.ldf_data}"
})
public void postProcessMessage(
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Expand Down Expand Up @@ -135,16 +138,17 @@ public void postProcessDatamart(
JsonNode payloadNode = objectMapper.readTree(payload);

Datamart dmData = objectMapper.readValue(payloadNode.get(PAYLOAD).toString(), Datamart.class);
final String PAYLOAD_PFX = "For payload: " + payloadNode;
if (Objects.isNull(dmData)) {
throw new NoDataException("For payload: " + payloadNode + " DataMart object is null. Skipping further processing");
throw new NoDataException(PAYLOAD_PFX + " DataMart object is null. Skipping further processing");
}
Map<Long, Long> dmMap = new HashMap<>();
if (Objects.isNull(dmData.getPublicHealthCaseUid()) || Objects.isNull(dmData.getPatientUid())) {
throw new NoDataException("For payload: " + payloadNode + " DataMart Public Health Case/Patient Id is null. Skipping further processing");
throw new NoDataException(PAYLOAD_PFX + " DataMart Public Health Case/Patient Id is null. Skipping further processing");
}
dmMap.put(dmData.getPublicHealthCaseUid(), dmData.getPatientUid());
if (Objects.isNull(dmData.getDatamart())) {
throw new NoDataException("For payload: " + payload + " DataMart value is null. Skipping further processing");
throw new NoDataException(PAYLOAD_PFX + " DataMart value is null. Skipping further processing");
}
dmCache.computeIfAbsent(dmData.getDatamart(), k -> ConcurrentHashMap.newKeySet()).add(dmMap);
} catch (NoDataException nde) {
Expand Down Expand Up @@ -185,8 +189,8 @@ protected void processCachedIds() {

ids.forEach(id -> {
if (idVals.containsKey(id)) {
processId(id, idVals.get(id),
investigationRepository::executeStoredProcForPageBuilder, Entity.CASE_ANSWERS);
processTopic(keyTopic, Entity.CASE_ANSWERS, id, idVals.get(id),
investigationRepository::executeStoredProcForPageBuilder);
idVals.remove(id);
}
});
Expand All @@ -199,6 +203,10 @@ protected void processCachedIds() {
processTopic(keyTopic, entity, ids,
postProcRepository::executeStoredProcForNotificationIds);
break;
case LDF_DATA:
sveselev marked this conversation as resolved.
Show resolved Hide resolved
processTopic(keyTopic, entity, ids,
postProcRepository::executeStoredProcForLdfIds);
break;
default:
logger.warn("Unknown topic: {} cannot be processed", keyTopic);
break;
Expand All @@ -223,7 +231,7 @@ protected void processDatamartIds() {
String patients =
dmSet.stream().flatMap(m -> m.values().stream().map(String::valueOf)).collect(Collectors.joining(","));

logger.info("Processing the {} message topic. Calling stored proc: {}('{}','{}')", dmType,
logger.info("Processing {} message topic. Calling stored proc: {}('{}','{}')", dmType,
"sp_hepatitis_datamart_postprocessing", cases, patients);
investigationRepository.executeStoredProcForHepDatamart(cases, patients);
completeLog();
Expand All @@ -247,7 +255,7 @@ Long extractIdFromMessage(String topic, String messageKey, String payload) {
}
id = keyNode.get(PAYLOAD).get(entity.getUidName()).asLong();

if (topic.contains(Entity.INVESTIGATION.getName())) {
if (entity.equals(Entity.INVESTIGATION)) {
JsonNode tblNode = payloadNode.get(PAYLOAD).get("rdb_table_name_list");
if (tblNode != null && !tblNode.isNull()) {
idVals.put(id, tblNode.asText());
Expand Down Expand Up @@ -284,20 +292,20 @@ private <T> List<T> processTopic(String keyTopic, Entity entity, List<Long> ids,
return result;
}

private void processTopic(String keyTopic, Entity entity, Long id, String vals, BiConsumer<Long, String> repositoryMethod) {
logger.info("Processing {} for topic: {}. Calling stored proc: {} '{}', '{}'", StringUtils.capitalize(entity.getName()), keyTopic,
entity.getStoredProcedure(), id, vals);
repositoryMethod.accept(id, vals);
completeLog();
}

private String prepareAndLog(String keyTopic, Entity entity, List<Long> ids) {
String idsString = ids.stream().map(String::valueOf).collect(Collectors.joining(","));
logger.info("Processing the {} message topic: {}. Calling stored proc: {} '{}'", entity.getName(), keyTopic,
logger.info("Processing {} for topic: {}. Calling stored proc: {} '{}'", StringUtils.capitalize(entity.getName()), keyTopic,
entity.getStoredProcedure(), idsString);
return idsString;
}

private void processId(Long id, String vals, BiConsumer<Long, String> repositoryMethod, Entity entity) {
logger.info("Processing PHC ID for {}. Calling stored proc: {} '{}', '{}'", entity.getName(),
entity.getStoredProcedure(), id, vals);
repositoryMethod.accept(id, vals);
completeLog();
}

private void completeLog() {
logger.info(SP_EXECUTION_COMPLETED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ spring:
patient: nrt_patient
provider: nrt_provider
notification: nrt_notifications
ldf_data: nrt_ldf_data
datamart: nbs_Datamart
dlq:
retry-suffix: -retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void tearDown() throws Exception {
})
void testExtractIdFromMessage(String topic, String messageKey, Long expectedId) {
Long extractedId = postProcessingServiceMock.extractIdFromMessage(topic, messageKey, messageKey);
assertNotNull(extractedId);
assertEquals(expectedId, extractedId);
}

Expand All @@ -81,6 +82,8 @@ void testPostProcessPatientMessage() {
String key = "{\"payload\":{\"patient_uid\":123}}";

postProcessingServiceMock.postProcessMessage(topic, key, key);
assertEquals(123L, postProcessingServiceMock.idCache.get(topic).get(0));

postProcessingServiceMock.processCachedIds();

String expectedPatientIdsString = "123";
Expand Down Expand Up @@ -141,6 +144,7 @@ void testPostProcessInvestigationMessage() {

List<ILoggingEvent> logs = listAppender.list;
assertEquals(5, logs.size());
assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.INVESTIGATION.getStoredProcedure()));
assertTrue(logs.get(4).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

Expand All @@ -158,6 +162,7 @@ void testPostProcessNotificationMessage() {

List<ILoggingEvent> logs = listAppender.list;
assertEquals(3, logs.size());
assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.NOTIFICATIONS.getStoredProcedure()));
assertTrue(logs.get(2).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

Expand Down Expand Up @@ -185,6 +190,24 @@ void testPostProcessPageBuilder() {
assertTrue(logs.get(6).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
void testPostProcessLdfData() {
String topic = "dummy_ldf_data";
String key = "{\"payload\":{\"ldf_uid\":123}}";

postProcessingServiceMock.postProcessMessage(topic, key, key);
postProcessingServiceMock.processCachedIds();

String expectedLdfIdsString = "123";
verify(postProcRepositoryMock).executeStoredProcForLdfIds(expectedLdfIdsString);
assertTrue(postProcessingServiceMock.idCache.containsKey(topic));

List<ILoggingEvent> logs = listAppender.list;
assertEquals(3, logs.size());
assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.LDF_DATA.getStoredProcedure()));
assertTrue(logs.get(2).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
void testPostProcessMultipleMessages() {
String orgKey1 = "{\"payload\":{\"organization_uid\":123}}";
Expand Down Expand Up @@ -225,31 +248,34 @@ void testPostProcessCacheIdsPriority() {
String patientKey = "{\"payload\":{\"patient_uid\":125}}";
String investigationKey = "{\"payload\":{\"public_health_case_uid\":126}}";
String notificationKey = "{\"payload\":{\"notification_uid\":127}}";
String ldfKey = "{\"payload\":{\"ldf_uid\":127}}";

String orgTopic = "dummy_organization";
String providerTopic = "dummy_provider";
String patientTopic = "dummy_patient";
String invTopic = "dummy_investigation";
String notfTopic = "dummy_notifications";
String ntfTopic = "dummy_notifications";
String ldfTopic = "dummy_ldf_data";

postProcessingServiceMock.postProcessMessage(invTopic, investigationKey, investigationKey);
postProcessingServiceMock.postProcessMessage(providerTopic, providerKey, providerKey);
postProcessingServiceMock.postProcessMessage(patientTopic, patientKey, patientKey);
postProcessingServiceMock.postProcessMessage(notfTopic, notificationKey, notificationKey);
postProcessingServiceMock.postProcessMessage(ntfTopic, notificationKey, notificationKey);
postProcessingServiceMock.postProcessMessage(orgTopic, orgKey, orgKey);
postProcessingServiceMock.postProcessMessage(ldfTopic, ldfKey, ldfKey);
postProcessingServiceMock.processCachedIds();

List<ILoggingEvent> logs = listAppender.list;
assertEquals(17, logs.size());

List<String> topicLogList = logs.stream().map(ILoggingEvent::getFormattedMessage).filter(m -> m.contains(
"message topic")).toList();
List<String> topicLogList = logs.stream().map(ILoggingEvent::getFormattedMessage).filter(m -> m.matches(
"Processing .+ for topic: .*")).toList();
assertTrue(topicLogList.get(0).contains(orgTopic));
assertTrue(topicLogList.get(1).contains(providerTopic));
assertTrue(topicLogList.get(2).contains(patientTopic));
assertTrue(topicLogList.get(3).contains(invTopic));
assertTrue(topicLogList.get(4).contains(invTopic));
assertTrue(topicLogList.get(5).contains(notfTopic));
assertTrue(topicLogList.get(5).contains(ntfTopic));
assertTrue(topicLogList.get(6).contains(ldfTopic));
}

@Test
Expand Down