Skip to content

Commit

Permalink
CNDIT-1364: LDF post-processing service integration (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
sveselev authored Aug 12, 2024
1 parent 125355b commit 5387e1c
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,4 @@ public class LdfDataKey {
@NonNull
@JsonProperty("ldf_uid")
private Long ldfUid;

@JsonProperty("business_object_uid")
private Long businessObjectUid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public String processLdfData(String value) {
Optional<LdfData> ldfData = ldfDataRepository.computeLdfData(busObjNm, ldfUid, busObjUid);
if (ldfData.isPresent()) {
ldfDataKey.setLdfUid(Long.valueOf(ldfUid));
ldfDataKey.setBusinessObjectUid(Long.valueOf(busObjUid));
pushKeyValuePairToKafka(ldfDataKey, ldfData.get(), ldfDataTopicReporting);
return objectMapper.writeValueAsString(ldfData.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gov.cdc.etldatapipeline.ldfdata.repository.LdfDataRepository;
import gov.cdc.etldatapipeline.ldfdata.model.dto.LdfData;
import gov.cdc.etldatapipeline.ldfdata.model.dto.LdfDataKey;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -35,11 +36,17 @@ class LdfDataServiceTest {
@Captor
private ArgumentCaptor<String> messageCaptor;

private AutoCloseable closeable;
private final CustomJsonGeneratorImpl jsonGenerator = new CustomJsonGeneratorImpl();

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
closeable=MockitoAnnotations.openMocks(this);
}

@AfterEach
void tearDown() throws Exception {
closeable.close();
}

@Test
Expand Down Expand Up @@ -100,7 +107,6 @@ private void validateData(String inputTopicName, String outputTopicName,

LdfDataKey ldfDataKey = new LdfDataKey();
ldfDataKey.setLdfUid(ldfData.getLdfUid());
ldfDataKey.setBusinessObjectUid(ldfData.getBusinessObjectUid());

String expectedKey = jsonGenerator.generateStringJson(ldfDataKey);
String expectedValue = jsonGenerator.generateStringJson(ldfData);
Expand All @@ -110,7 +116,6 @@ private void validateData(String inputTopicName, String outputTopicName,
assertEquals(expectedKey, keyCaptor.getValue());
assertEquals(expectedValue, messageCaptor.getValue());
assertTrue(keyCaptor.getValue().contains(String.valueOf(ldfDataKey.getLdfUid())));
assertTrue(keyCaptor.getValue().contains(String.valueOf(ldfDataKey.getBusinessObjectUid())));
}

private LdfDataService getInvestigationService(String inputTopicName, String outputTopicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ public interface PostProcRepository extends JpaRepository<PostProcSp, Long> {

@Procedure("sp_nrt_notification_postprocessing")
void executeStoredProcForNotificationIds(@Param("notificationUids") String notificationUids);

@Procedure("sp_nrt_ldf_postprocessing")
void executeStoredProcForLdfIds(@Param("ldf_uids") String ldfUids);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.*;
import java.util.Map.Entry;
Expand Down Expand Up @@ -54,13 +55,14 @@ public class PostProcessingService {
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

@Getter
private enum Entity {
enum Entity {
ORGANIZATION(1, "organization", "organization_uid", "sp_nrt_organization_postprocessing"),
PROVIDER(2, "provider", "provider_uid", "sp_nrt_provider_postprocessing"),
PATIENT(3, "patient", "patient_uid", "sp_nrt_patient_postprocessing"),
INVESTIGATION(4, "investigation", "public_health_case_uid", "sp_nrt_investigation_postprocessing"),
NOTIFICATIONS(5, "notifications", "notification_uid", "sp_nrt_notification_postprocessing"),
F_PAGE_CASE(0, "investigation", "public_health_case_uid", "sp_f_page_case_postprocessing"),
LDF_DATA(6, "ldf_data", "ldf_uid", "sp_nrt_ldf_postprocessing"),
F_PAGE_CASE(0, "fact page case", "public_health_case_uid", "sp_f_page_case_postprocessing"),
CASE_ANSWERS(0, "case answers", "public_health_case_uid", "sp_page_builder_postprocessing"),
UNKNOWN(-1, "unknown", "unknown_uid", "sp_nrt_unknown_postprocessing");

Expand Down Expand Up @@ -99,7 +101,8 @@ private enum Entity {
"${spring.kafka.topic.organization}",
"${spring.kafka.topic.patient}",
"${spring.kafka.topic.provider}",
"${spring.kafka.topic.notification}"
"${spring.kafka.topic.notification}",
"${spring.kafka.topic.ldf_data}"
})
public void postProcessMessage(
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Expand Down Expand Up @@ -135,16 +138,17 @@ public void postProcessDatamart(
JsonNode payloadNode = objectMapper.readTree(payload);

Datamart dmData = objectMapper.readValue(payloadNode.get(PAYLOAD).toString(), Datamart.class);
final String PAYLOAD_PFX = "For payload: " + payloadNode;
if (Objects.isNull(dmData)) {
throw new NoDataException("For payload: " + payloadNode + " DataMart object is null. Skipping further processing");
throw new NoDataException(PAYLOAD_PFX + " DataMart object is null. Skipping further processing");
}
Map<Long, Long> dmMap = new HashMap<>();
if (Objects.isNull(dmData.getPublicHealthCaseUid()) || Objects.isNull(dmData.getPatientUid())) {
throw new NoDataException("For payload: " + payloadNode + " DataMart Public Health Case/Patient Id is null. Skipping further processing");
throw new NoDataException(PAYLOAD_PFX + " DataMart Public Health Case/Patient Id is null. Skipping further processing");
}
dmMap.put(dmData.getPublicHealthCaseUid(), dmData.getPatientUid());
if (Objects.isNull(dmData.getDatamart())) {
throw new NoDataException("For payload: " + payload + " DataMart value is null. Skipping further processing");
throw new NoDataException(PAYLOAD_PFX + " DataMart value is null. Skipping further processing");
}
dmCache.computeIfAbsent(dmData.getDatamart(), k -> ConcurrentHashMap.newKeySet()).add(dmMap);
} catch (NoDataException nde) {
Expand Down Expand Up @@ -185,8 +189,8 @@ protected void processCachedIds() {

ids.forEach(id -> {
if (idVals.containsKey(id)) {
processId(id, idVals.get(id),
investigationRepository::executeStoredProcForPageBuilder, Entity.CASE_ANSWERS);
processTopic(keyTopic, Entity.CASE_ANSWERS, id, idVals.get(id),
investigationRepository::executeStoredProcForPageBuilder);
idVals.remove(id);
}
});
Expand All @@ -199,6 +203,10 @@ protected void processCachedIds() {
processTopic(keyTopic, entity, ids,
postProcRepository::executeStoredProcForNotificationIds);
break;
case LDF_DATA:
processTopic(keyTopic, entity, ids,
postProcRepository::executeStoredProcForLdfIds);
break;
default:
logger.warn("Unknown topic: {} cannot be processed", keyTopic);
break;
Expand All @@ -223,7 +231,7 @@ protected void processDatamartIds() {
String patients =
dmSet.stream().flatMap(m -> m.values().stream().map(String::valueOf)).collect(Collectors.joining(","));

logger.info("Processing the {} message topic. Calling stored proc: {}('{}','{}')", dmType,
logger.info("Processing {} message topic. Calling stored proc: {}('{}','{}')", dmType,
"sp_hepatitis_datamart_postprocessing", cases, patients);
investigationRepository.executeStoredProcForHepDatamart(cases, patients);
completeLog();
Expand All @@ -247,7 +255,7 @@ Long extractIdFromMessage(String topic, String messageKey, String payload) {
}
id = keyNode.get(PAYLOAD).get(entity.getUidName()).asLong();

if (topic.contains(Entity.INVESTIGATION.getName())) {
if (entity.equals(Entity.INVESTIGATION)) {
JsonNode tblNode = payloadNode.get(PAYLOAD).get("rdb_table_name_list");
if (tblNode != null && !tblNode.isNull()) {
idVals.put(id, tblNode.asText());
Expand Down Expand Up @@ -284,20 +292,20 @@ private <T> List<T> processTopic(String keyTopic, Entity entity, List<Long> ids,
return result;
}

private void processTopic(String keyTopic, Entity entity, Long id, String vals, BiConsumer<Long, String> repositoryMethod) {
logger.info("Processing {} for topic: {}. Calling stored proc: {} '{}', '{}'", StringUtils.capitalize(entity.getName()), keyTopic,
entity.getStoredProcedure(), id, vals);
repositoryMethod.accept(id, vals);
completeLog();
}

private String prepareAndLog(String keyTopic, Entity entity, List<Long> ids) {
String idsString = ids.stream().map(String::valueOf).collect(Collectors.joining(","));
logger.info("Processing the {} message topic: {}. Calling stored proc: {} '{}'", entity.getName(), keyTopic,
logger.info("Processing {} for topic: {}. Calling stored proc: {} '{}'", StringUtils.capitalize(entity.getName()), keyTopic,
entity.getStoredProcedure(), idsString);
return idsString;
}

private void processId(Long id, String vals, BiConsumer<Long, String> repositoryMethod, Entity entity) {
logger.info("Processing PHC ID for {}. Calling stored proc: {} '{}', '{}'", entity.getName(),
entity.getStoredProcedure(), id, vals);
repositoryMethod.accept(id, vals);
completeLog();
}

private void completeLog() {
logger.info(SP_EXECUTION_COMPLETED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ spring:
patient: nrt_patient
provider: nrt_provider
notification: nrt_notifications
ldf_data: nrt_ldf_data
datamart: nbs_Datamart
dlq:
retry-suffix: -retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void tearDown() throws Exception {
})
void testExtractIdFromMessage(String topic, String messageKey, Long expectedId) {
Long extractedId = postProcessingServiceMock.extractIdFromMessage(topic, messageKey, messageKey);
assertNotNull(extractedId);
assertEquals(expectedId, extractedId);
}

Expand All @@ -81,6 +82,8 @@ void testPostProcessPatientMessage() {
String key = "{\"payload\":{\"patient_uid\":123}}";

postProcessingServiceMock.postProcessMessage(topic, key, key);
assertEquals(123L, postProcessingServiceMock.idCache.get(topic).get(0));

postProcessingServiceMock.processCachedIds();

String expectedPatientIdsString = "123";
Expand Down Expand Up @@ -141,6 +144,7 @@ void testPostProcessInvestigationMessage() {

List<ILoggingEvent> logs = listAppender.list;
assertEquals(5, logs.size());
assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.INVESTIGATION.getStoredProcedure()));
assertTrue(logs.get(4).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

Expand All @@ -158,6 +162,7 @@ void testPostProcessNotificationMessage() {

List<ILoggingEvent> logs = listAppender.list;
assertEquals(3, logs.size());
assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.NOTIFICATIONS.getStoredProcedure()));
assertTrue(logs.get(2).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

Expand Down Expand Up @@ -185,6 +190,24 @@ void testPostProcessPageBuilder() {
assertTrue(logs.get(6).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
void testPostProcessLdfData() {
String topic = "dummy_ldf_data";
String key = "{\"payload\":{\"ldf_uid\":123}}";

postProcessingServiceMock.postProcessMessage(topic, key, key);
postProcessingServiceMock.processCachedIds();

String expectedLdfIdsString = "123";
verify(postProcRepositoryMock).executeStoredProcForLdfIds(expectedLdfIdsString);
assertTrue(postProcessingServiceMock.idCache.containsKey(topic));

List<ILoggingEvent> logs = listAppender.list;
assertEquals(3, logs.size());
assertTrue(logs.get(1).getFormattedMessage().contains(PostProcessingService.Entity.LDF_DATA.getStoredProcedure()));
assertTrue(logs.get(2).getMessage().contains(PostProcessingService.SP_EXECUTION_COMPLETED));
}

@Test
void testPostProcessMultipleMessages() {
String orgKey1 = "{\"payload\":{\"organization_uid\":123}}";
Expand Down Expand Up @@ -225,31 +248,34 @@ void testPostProcessCacheIdsPriority() {
String patientKey = "{\"payload\":{\"patient_uid\":125}}";
String investigationKey = "{\"payload\":{\"public_health_case_uid\":126}}";
String notificationKey = "{\"payload\":{\"notification_uid\":127}}";
String ldfKey = "{\"payload\":{\"ldf_uid\":127}}";

String orgTopic = "dummy_organization";
String providerTopic = "dummy_provider";
String patientTopic = "dummy_patient";
String invTopic = "dummy_investigation";
String notfTopic = "dummy_notifications";
String ntfTopic = "dummy_notifications";
String ldfTopic = "dummy_ldf_data";

postProcessingServiceMock.postProcessMessage(invTopic, investigationKey, investigationKey);
postProcessingServiceMock.postProcessMessage(providerTopic, providerKey, providerKey);
postProcessingServiceMock.postProcessMessage(patientTopic, patientKey, patientKey);
postProcessingServiceMock.postProcessMessage(notfTopic, notificationKey, notificationKey);
postProcessingServiceMock.postProcessMessage(ntfTopic, notificationKey, notificationKey);
postProcessingServiceMock.postProcessMessage(orgTopic, orgKey, orgKey);
postProcessingServiceMock.postProcessMessage(ldfTopic, ldfKey, ldfKey);
postProcessingServiceMock.processCachedIds();

List<ILoggingEvent> logs = listAppender.list;
assertEquals(17, logs.size());

List<String> topicLogList = logs.stream().map(ILoggingEvent::getFormattedMessage).filter(m -> m.contains(
"message topic")).toList();
List<String> topicLogList = logs.stream().map(ILoggingEvent::getFormattedMessage).filter(m -> m.matches(
"Processing .+ for topic: .*")).toList();
assertTrue(topicLogList.get(0).contains(orgTopic));
assertTrue(topicLogList.get(1).contains(providerTopic));
assertTrue(topicLogList.get(2).contains(patientTopic));
assertTrue(topicLogList.get(3).contains(invTopic));
assertTrue(topicLogList.get(4).contains(invTopic));
assertTrue(topicLogList.get(5).contains(notfTopic));
assertTrue(topicLogList.get(5).contains(ntfTopic));
assertTrue(topicLogList.get(6).contains(ldfTopic));
}

@Test
Expand Down

0 comments on commit 5387e1c

Please sign in to comment.