Skip to content

Commit

Permalink
CNDIT-1558 Investigation confirmation method date transformation fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sveselev committed Aug 29, 2024
1 parent 550e2f1 commit 4239f34
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package gov.cdc.etldatapipeline.investigation.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl;
Expand All @@ -13,8 +14,6 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -56,10 +55,10 @@ public InvestigationTransformed transformInvestigationData(Investigation investi

public void processNotifications(String investigationNotifications, ObjectMapper objectMapper) {
try {
JsonNode investigationNotificationsJsonArray = investigationNotifications != null ? objectMapper.readTree(investigationNotifications) : null;
JsonNode investigationNotificationsJsonArray = parseJsonArray(investigationNotifications, objectMapper);
InvestigationNotificationsKey investigationNotificationsKey = new InvestigationNotificationsKey();

if (investigationNotificationsJsonArray != null && investigationNotificationsJsonArray.isArray()) {
if (investigationNotificationsJsonArray != null) {
for (JsonNode node : investigationNotificationsJsonArray) {
Long notificationUid = node.get("notification_uid").asLong();
investigationNotificationsKey.setNotificationUid(notificationUid);
Expand All @@ -80,25 +79,25 @@ public void processNotifications(String investigationNotifications, ObjectMapper
}
}


private void transformPersonParticipations(String personParticipations, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) {
try {
JsonNode personParticipationsJsonArray = personParticipations != null ? objectMapper.readTree(personParticipations) : null;
JsonNode personParticipationsJsonArray = parseJsonArray(personParticipations, objectMapper);

if(personParticipationsJsonArray != null && personParticipationsJsonArray.isArray()) {
for(JsonNode node : personParticipationsJsonArray) {
if (personParticipationsJsonArray != null) {
for (JsonNode node : personParticipationsJsonArray) {
String typeCode = node.get("type_cd").asText();
String subjectClassCode = node.get("subject_class_cd").asText();
String personCode = node.get("person_cd").asText();
Long entityId = node.get("entity_id").asLong();

if(typeCode.equals("InvestgrOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PRV")) {
investigationTransformed.setInvestigatorId(node.get("entity_id").asLong());
if (typeCode.equals("InvestgrOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PRV")) {
investigationTransformed.setInvestigatorId(entityId);
}
if(typeCode.equals("PhysicianOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PRV")) {
investigationTransformed.setPhysicianId(node.get("entity_id").asLong());
if (typeCode.equals("PhysicianOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PRV")) {
investigationTransformed.setPhysicianId(entityId);
}
if(typeCode.equals("SubjOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PAT")) {
investigationTransformed.setPatientId(node.get("entity_id").asLong());
if (typeCode.equals("SubjOfPHC") && subjectClassCode.equals("PSN") && personCode.equals("PAT")) {
investigationTransformed.setPatientId(entityId);
}
}
}
Expand All @@ -112,9 +111,9 @@ private void transformPersonParticipations(String personParticipations, Investig

private void transformOrganizationParticipations(String organizationParticipations, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) {
try {
JsonNode organizationParticipationsJsonArray = organizationParticipations != null ? objectMapper.readTree(organizationParticipations) : null;
JsonNode organizationParticipationsJsonArray = parseJsonArray(organizationParticipations, objectMapper);

if(organizationParticipationsJsonArray != null && organizationParticipationsJsonArray.isArray()) {
if(organizationParticipationsJsonArray != null) {
for(JsonNode node : organizationParticipationsJsonArray) {
String typeCode = node.get("type_cd").asText();
String subjectClassCode = node.get("subject_class_cd").asText();
Expand All @@ -134,21 +133,22 @@ private void transformOrganizationParticipations(String organizationParticipatio

private void transformActIds(String actIds, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) {
try {
JsonNode actIdsJsonArray = actIds != null ? objectMapper.readTree(actIds) : null;
JsonNode actIdsJsonArray = parseJsonArray(actIds, objectMapper);

if(actIdsJsonArray != null && actIdsJsonArray.isArray()) {
if(actIdsJsonArray != null) {
for(JsonNode node : actIdsJsonArray) {
int actIdSeq = node.get("act_id_seq").asInt();
String typeCode = node.get("type_cd").asText();
String rootExtension = node.get("root_extension_txt").asText();

if(typeCode.equals("STATE") && actIdSeq == 1) {
investigationTransformed.setInvStateCaseId(node.get("root_extension_txt").asText());
investigationTransformed.setInvStateCaseId(rootExtension);
}
if(typeCode.equals("CITY") && actIdSeq == 2) {
investigationTransformed.setCityCountyCaseNbr(node.get("root_extension_txt").asText());
investigationTransformed.setCityCountyCaseNbr(rootExtension);
}
if(typeCode.equals("LEGACY") && actIdSeq == 3) {
investigationTransformed.setLegacyCaseId(node.get("root_extension_txt").asText());
investigationTransformed.setLegacyCaseId(rootExtension);
}
}
}
Expand All @@ -162,11 +162,11 @@ private void transformActIds(String actIds, InvestigationTransformed investigati

private void transformObservationIds(String observationNotificationIds, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) {
try {
JsonNode investigationObservationIdsJsonArray = observationNotificationIds != null ? objectMapper.readTree(observationNotificationIds) : null;
JsonNode investigationObservationIdsJsonArray = parseJsonArray(observationNotificationIds, objectMapper);
InvestigationObservation investigationObservation = new InvestigationObservation();
List<Long> observationIds = new ArrayList<>();

if(investigationObservationIdsJsonArray != null && investigationObservationIdsJsonArray.isArray()) {
if(investigationObservationIdsJsonArray != null) {
for(JsonNode node : investigationObservationIdsJsonArray) {
String sourceClassCode = node.get("source_class_cd").asText();
String actTypeCode = node.get("act_type_cd").asText();
Expand Down Expand Up @@ -199,47 +199,35 @@ private void transformObservationIds(String observationNotificationIds, Investig

private void transformInvestigationConfirmationMethod(String investigationConfirmationMethod, ObjectMapper objectMapper) {
try {
JsonNode investigationConfirmationMethodJsonArray = investigationConfirmationMethod != null ? objectMapper.readTree(investigationConfirmationMethod) : null;
InvestigationConfirmationMethodKey investigationConfirmationMethodKey = new InvestigationConfirmationMethodKey();
InvestigationConfirmationMethod investigationConfirmation = new InvestigationConfirmationMethod();
Long publicHealthCaseUid;
Map<String, String> confirmationMethodMap = new HashMap<>();
Instant confirmationMethodTime = null;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
JsonNode investigationConfirmationMethodJsonArray = parseJsonArray(investigationConfirmationMethod, objectMapper);

// Redundant time variable in case if confirmation_method_time is null in all rows of the array
String phcLastChgTime = null;
if(investigationConfirmationMethodJsonArray != null) {
InvestigationConfirmationMethodKey investigationConfirmationMethodKey = new InvestigationConfirmationMethodKey();
InvestigationConfirmationMethod investigationConfirmation = new InvestigationConfirmationMethod();
Map<String, String> confirmationMethodMap = new HashMap<>();
String confirmationMethodTime = null;

// Redundant time variable in case if confirmation_method_time is null in all rows of the array
String phcLastChgTime = investigationConfirmationMethodJsonArray.get(0).get("phc_last_chg_time").asText();
Long publicHealthCaseUid = investigationConfirmationMethodJsonArray.get(0).get("public_health_case_uid").asLong();

if(investigationConfirmationMethodJsonArray != null && investigationConfirmationMethodJsonArray.isArray()) {
publicHealthCaseUid = investigationConfirmationMethodJsonArray.get(0).get("public_health_case_uid").asLong();
phcLastChgTime = investigationConfirmationMethodJsonArray.get(0).get("phc_last_chg_time").asText();
for(JsonNode node : investigationConfirmationMethodJsonArray) {
String confirmationMethodTimeString = node.get("confirmation_method_time").asText();
Instant currentInstant = null;

// While getting data from JSON node, it is considered as literal String and that is why the null check
// has equals for null String instead of null value.
if(confirmationMethodTimeString != null && !confirmationMethodTimeString.equals("null")) {
Date dateTime = sdf.parse(confirmationMethodTimeString);
currentInstant = dateTime.toInstant();
}
if (confirmationMethodTime == null || currentInstant.isAfter(confirmationMethodTime)) {
confirmationMethodTime = currentInstant;
JsonNode timeNode = node.get("confirmation_method_time");
if (timeNode != null && !timeNode.isNull()) {
confirmationMethodTime = timeNode.asText();
}
confirmationMethodMap.put(node.get("confirmation_method_cd").asText(), node.get("confirmation_method_desc_txt").asText());
}
investigationConfirmation.setPublicHealthCaseUid(publicHealthCaseUid);
investigationConfirmationMethodKey.setPublicHealthCaseUid(publicHealthCaseUid);

investigationConfirmation.setConfirmationMethodTime(
confirmationMethodTime == null ? phcLastChgTime : confirmationMethodTime);

if(confirmationMethodTime == null) {
investigationConfirmation.setConfirmationMethodTime(phcLastChgTime);
}
for(String key : confirmationMethodMap.keySet()) {
investigationConfirmation.setConfirmationMethodCd(key);
investigationConfirmation.setConfirmationMethodDescTxt(confirmationMethodMap.get(key));
investigationConfirmationMethodKey.setConfirmationMethodCd(key);
for(Map.Entry<String, String> entry : confirmationMethodMap.entrySet()) {
investigationConfirmation.setConfirmationMethodCd(entry.getKey());
investigationConfirmation.setConfirmationMethodDescTxt(entry.getValue());
investigationConfirmationMethodKey.setConfirmationMethodCd(entry.getKey());
String jsonKey = jsonGenerator.generateStringJson(investigationConfirmationMethodKey);
String jsonValue = jsonGenerator.generateStringJson(investigationConfirmation);
kafkaTemplate.send(investigationConfirmationOutputTopicName, jsonKey, jsonValue);
Expand All @@ -255,9 +243,9 @@ private void transformInvestigationConfirmationMethod(String investigationConfir

private void processInvestigationPageCaseAnswer(String investigationCaseAnswer, InvestigationTransformed investigationTransformed, ObjectMapper objectMapper) {
try {
JsonNode investigationCaseAnswerJsonArray = investigationCaseAnswer != null ? objectMapper.readTree(investigationCaseAnswer) : null;
JsonNode investigationCaseAnswerJsonArray = parseJsonArray(investigationCaseAnswer, objectMapper);

if(investigationCaseAnswerJsonArray != null && investigationCaseAnswerJsonArray.isArray()) {
if(investigationCaseAnswerJsonArray != null) {
Long actUid = investigationCaseAnswerJsonArray.get(0).get("act_uid").asLong();
List<InvestigationCaseAnswer> investigationCaseAnswerDataIfPresent = investigationCaseAnswerRepository.findByActUid(actUid);
List<InvestigationCaseAnswer> investigationCaseAnswerList = new ArrayList<>();
Expand Down Expand Up @@ -288,4 +276,13 @@ private void processInvestigationPageCaseAnswer(String investigationCaseAnswer,
logger.error("Error processing investigation case answer JSON array from investigation data: {}", e.getMessage());
}
}

private JsonNode parseJsonArray(String jsonString, ObjectMapper objectMapper) throws JsonProcessingException {
JsonNode jsonArray = jsonString != null ? objectMapper.readTree(jsonString) : null;
if (jsonArray != null) {
return jsonArray.isArray() ? jsonArray : null;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import gov.cdc.etldatapipeline.investigation.repository.model.dto.*;
import gov.cdc.etldatapipeline.investigation.repository.rdb.InvestigationCaseAnswerRepository;
import gov.cdc.etldatapipeline.investigation.util.ProcessInvestigationDataUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -46,6 +47,8 @@ class InvestigationDataProcessingTests {
@Captor
private ArgumentCaptor<String> messageCaptor;

private AutoCloseable closeable;

private static final String FILE_PREFIX = "rawDataFiles/";
private static final String CONFIRMATION_TOPIC = "confirmationTopic";
private static final String OBSERVATION_TOPIC = "observationTopic";
Expand All @@ -59,10 +62,15 @@ class InvestigationDataProcessingTests {

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
closeable = MockitoAnnotations.openMocks(this);
transformer = new ProcessInvestigationDataUtil(kafkaTemplate, investigationCaseAnswerRepository);
}

@AfterEach
void tearDown() throws Exception {
closeable.close();
}

@Test
void testConfirmationMethod() {
Investigation investigation = new Investigation();
Expand All @@ -79,7 +87,7 @@ void testConfirmationMethod() {
confirmationMethod.setPublicHealthCaseUid(investigationUid);
confirmationMethod.setConfirmationMethodCd("LD");
confirmationMethod.setConfirmationMethodDescTxt("Laboratory confirmed");
confirmationMethod.setConfirmationMethodTime("2024-01-15T10:20:57.647");
confirmationMethod.setConfirmationMethodTime("2024-01-15T10:20:57.001");

transformer.transformInvestigationData(investigation);
verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
"public_health_case_uid": 234567890,
"confirmation_method_cd": "LD",
"confirmation_method_desc_txt": "Laboratory confirmed",
"confirmation_method_time": null,
"confirmation_method_time": "2024-01-15T10:20:57.001",
"phc_last_chg_time": "2024-01-15T10:20:57.647"
},
{
"public_health_case_uid": 234567890,
"confirmation_method_cd": "LR",
"confirmation_method_desc_txt": "Laboratory report",
"confirmation_method_time": null,
"confirmation_method_time": "2024-01-15T10:20:57.001",
"phc_last_chg_time": "2024-01-15T10:20:57.647"
}
]

0 comments on commit 4239f34

Please sign in to comment.