Skip to content

Commit

Permalink
CNDE-1805: RTR Observation: Add new columns and logic (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
sveselev authored Oct 10, 2024
1 parent 9ad7ca3 commit b467647
Show file tree
Hide file tree
Showing 22 changed files with 865 additions and 432 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package gov.cdc.etldatapipeline.investigation.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import gov.cdc.etldatapipeline.commonutil.NoDataException;
import gov.cdc.etldatapipeline.commonutil.json.CustomJsonGeneratorImpl;
import gov.cdc.etldatapipeline.investigation.repository.model.dto.NotificationUpdate;
Expand Down Expand Up @@ -49,7 +47,6 @@ public class InvestigationService {

private static final Logger logger = LoggerFactory.getLogger(InvestigationService.class);
private final ExecutorService phcExecutor = Executors.newFixedThreadPool(nProc*2, new CustomizableThreadFactory("phc-"));
private static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

@Value("${spring.kafka.input.topic-name-phc}")
private String investigationTopic;
Expand Down Expand Up @@ -128,7 +125,7 @@ public void processInvestigation(String value) {
// only process and send notifications when investigation data has been sent
.whenComplete((res, ex) ->
logger.info("Investigation data (uid={}) sent to {}", phcUid, investigationTopicReporting))
.thenRunAsync(() -> processDataUtil.processNotifications(investigation.getInvestigationNotifications(), objectMapper))
.thenRunAsync(() -> processDataUtil.processNotifications(investigation.getInvestigationNotifications()))
.join();
} else {
throw new EntityNotFoundException("Unable to find Investigation with id: " + publicHealthCaseUid);
Expand All @@ -151,7 +148,7 @@ public void processNotification(String value) {
Optional<NotificationUpdate> notificationData = notificationRepository.computeNotifications(notificationUid);
if (notificationData.isPresent()) {
NotificationUpdate notification = notificationData.get();
processDataUtil.processNotifications(notification.getInvestigationNotifications(), objectMapper);
processDataUtil.processNotifications(notification.getInvestigationNotifications());
} else {
throw new EntityNotFoundException("Unable to find Notification with id; " + notificationUid );
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void testTransformInvestigationError(){
investigation.setInvestigationCaseAnswer(invalidJSON);

transformer.transformInvestigationData(investigation);
transformer.processNotifications(invalidJSON, objectMapper);
transformer.processNotifications(invalidJSON);

List<ILoggingEvent> logs = listAppender.list;
logs.forEach(le -> assertTrue(le.getFormattedMessage().contains(invalidJSON)));
Expand All @@ -138,10 +138,10 @@ void testObservationNotificationIds() throws JsonProcessingException {

InvestigationObservation observation = new InvestigationObservation();
observation.setPublicHealthCaseUid(investigationUid);
observation.setObservationId(263748596L);
observation.setObservationId(263748599L);

transformer.transformInvestigationData(investigation);
verify(kafkaTemplate).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
verify(kafkaTemplate, times(2)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
assertEquals(OBSERVATION_TOPIC, topicCaptor.getValue());

var actualObservation = objectMapper.readValue(
Expand All @@ -165,7 +165,7 @@ void testProcessNotifications() throws JsonProcessingException {

when(kafkaTemplate.send(anyString(), anyString(), anyString())).thenReturn(CompletableFuture.completedFuture(null));

transformer.processNotifications(investigation.getInvestigationNotifications(), new ObjectMapper());
transformer.processNotifications(investigation.getInvestigationNotifications());
verify(kafkaTemplate, times (1)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
assertEquals(NOTIFICATIONS_TOPIC, topicCaptor.getValue());

Expand All @@ -185,8 +185,8 @@ void testProcessMissingOrInvalidNotifications() {
investigation.setPublicHealthCaseUid(investigationUid);
investigation.setInvestigationNotifications(null);
transformer.investigationNotificationsOutputTopicName = NOTIFICATIONS_TOPIC;
transformer.processNotifications(null, new ObjectMapper());
transformer.processNotifications("{\"foo\":\"bar\"}", new ObjectMapper());
transformer.processNotifications(null);
transformer.processNotifications("{\"foo\":\"bar\"}");
verify(kafkaTemplate, never()).send(eq(NOTIFICATIONS_TOPIC), anyString(), anyString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ private void validateInvestigationData(String payload, Investigation investigati
investigationKey.setPublicHealthCaseUid(investigation.getPublicHealthCaseUid());
final InvestigationReporting reportingModel = constructInvestigationReporting(investigation.getPublicHealthCaseUid());

verify(kafkaTemplate, times(5)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());
verify(kafkaTemplate, times(6)).send(topicCaptor.capture(), keyCaptor.capture(), messageCaptor.capture());

String actualTopic = topicCaptor.getAllValues().get(3);
String actualKey = keyCaptor.getAllValues().get(3);
String actualValue = messageCaptor.getAllValues().get(3);
String actualTopic = topicCaptor.getAllValues().get(4);
String actualKey = keyCaptor.getAllValues().get(4);
String actualValue = messageCaptor.getAllValues().get(4);

var actualReporting = objectMapper.readValue(
objectMapper.readTree(actualValue).path("payload").toString(), InvestigationReporting.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@
"act_last_chg_user_id": null,
"last_chg_user_name": null,
"act_last_chg_time": "2023-03-01T07:41:48.640"
},
{
"source_act_uid": 263748599,
"public_health_case_uid": 234567890,
"source_class_cd": "OBS",
"target_class_cd": "CASE",
"act_type_cd": "MorbReport",
"status_cd": "A",
"act_add_time": "2023-03-01T07:41:48.640",
"act_add_user_id": null,
"add_user_name": null,
"act_last_chg_user_id": null,
"last_chg_user_name": null,
"act_last_chg_time": "2023-03-01T07:41:48.640"
}

]
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ databaseChangeLog:
author: liquibase
changes:
- sqlFile:
path: 012-create-nrt-investigation-notification-002.sql
path: 012-create_nrt_investigation_notification-002.sql
splitStatements: false
- changeSet:
id: 14
Expand Down Expand Up @@ -355,4 +355,11 @@ databaseChangeLog:
changes:
- sqlFile:
path: 017-sp_d_labtest_result_postprocessing-001.sql
splitStatements: false
splitStatements: false
- changeSet:
id: 51
author: liquibase
changes:
- sqlFile:
path: 017-create_nrt_observation-004.sql
splitStatements: false
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,4 @@ IF EXISTS (SELECT 1 FROM sysobjects WHERE name = 'nrt_observation' and xtype = '
ADD ctrl_cd_display_form varchar(20);
END;

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'obs_domain_cd_st_1' AND Object_ID = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD obs_domain_cd_st_1 varchar(20);

END;
IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'processing_decision_cd' AND Object_ID = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD processing_decision_cd varchar(20);

END;

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'cd' AND Object_ID = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD cd varchar(50);
END;

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'shared_ind' AND Object_ID = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD shared_ind char(1);
END;

END;
Original file line number Diff line number Diff line change
@@ -1,34 +1,5 @@
IF EXISTS (SELECT 1 FROM sysobjects WHERE name = 'nrt_observation' and xtype = 'U')
BEGIN
IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'ctrl_cd_display_form' AND object_id = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD ctrl_cd_display_form varchar(20);
END;

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'obs_domain_cd_st_1' AND object_id = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD obs_domain_cd_st_1 varchar(20);
END;

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'processing_decision_cd' AND object_id = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD processing_decision_cd varchar(20);
END;

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'cd' AND object_id = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD cd varchar(50);
END;

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'shared_ind' AND object_id = Object_ID(N'nrt_observation'))
BEGIN
ALTER TABLE nrt_observation
ADD shared_ind char(1);
END;

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE name = N'status_cd' AND object_id = Object_ID(N'nrt_observation'))
BEGIN
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
IF EXISTS (SELECT 1 FROM sysobjects WHERE name = 'nrt_observation' and xtype = 'U')
BEGIN

IF NOT EXISTS(SELECT 1 FROM sys.columns WHERE Name = N'cd_desc_txt' AND Object_ID = Object_ID(N'nrt_observation'))
BEGIN
EXEC sys.sp_rename N'nrt_observation.cd_desc_text', N'cd_desc_txt', 'COLUMN';
END;

END;
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Observation {
private String obsDomainCdSt1;

@Column(name = "cd_desc_txt")
private String cdDescText;
private String cdDescTxt;

@Column(name = "record_status_cd")
private String recordStatusCd;
Expand Down Expand Up @@ -120,6 +120,9 @@ public class Observation {
@Column(name = "txt")
private String txt;

@Column(name = "priority_cd")
private String priorityCd;

@Column(name = "add_user_id")
private Long addUserId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,45 @@

import lombok.*;

@Setter
@Getter
@Data
public class ObservationTransformed {
private Long orderingPersonId;
private Long observationUid;
private Long reportObservationUid;
private Long reportSprtUid;
private Long reportRefrUid;
private String resultObservationUid;
private String followUpObservationUid;

private Long patientId;
private Long performingOrganizationId;
private Long orderingPersonId;
private Long morbPhysicianId;
private Long morbReporterId;
private Long morbHospReporterId;
private Long morbHospId;

private Long transcriptionistId;
private String transcriptionistVal;
private String transcriptionistFirstNm;
private String transcriptionistLastNm;
private String transcriptionistIdAssignAuth;
private String transcriptionistAuthType;

private Long assistantInterpreterId;
private String assistantInterpreterVal;
private String assistantInterpreterFirstNm;
private String assistantInterpreterLastNm;
private String assistantInterpreterIdAssignAuth;
private String assistantInterpreterAuthType;

private Long resultInterpreterId;
private Long specimenCollectorId;
private Long copyToProviderId;
private Long labTestTechnicianId;
private Long authorOrganizationId;
private Long orderingOrganizationId;
private Long performingOrganizationId;
private Long healthCareId;

private String accessionNumber;
private Long materialId;
private String resultObservationUid;
private String followUpObservationUid;
private Long reportObservationUid;
private Long reportSprtUid;
private Long reportRefrUid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class ObservationReporting {
private String classCd;
private String moodCd;
private Long actUid;
private String cdDescText;
private String recordStatusCd;
private String jurisdictionCd;
private Long programJurisdictionOid;
Expand All @@ -31,6 +30,7 @@ public class ObservationReporting {
private String ctrlCdDisplayForm;
private String processingDecisionCd;
private String cd;
private String cdDescTxt;
private String sharedInd;

private String statusCd;
Expand Down Expand Up @@ -59,25 +59,34 @@ public class ObservationReporting {
private Long orderingPersonId;
private Long morbPhysicianId;
private Long morbReporterId;
private Long morbHospReporterId;
private Long morbHospId;

private Long transcriptionistId;
private String transcriptionistVal;
private String transcriptionistFirstNm;
private String transcriptionistLastNm;
private String transcriptionistIdAssignAuth;
private String transcriptionistAuthType;

private Long assistantInterpreterId;
private String assistantInterpreterVal;
private String assistantInterpreterFirstNm;
private String assistantInterpreterLastNm;
private String assistantInterpreterIdAssignAuth;
private String assistantInterpreterAuthType;

private Long resultInterpreterId;
private Long specimenCollectorId;
private Long copyToProviderId;
private Long labTestTechnicianId;

private Long authorOrganizationId;
private Long orderingOrganizationId;
private Long performingOrganizationId;
private Long healthCareId;
private Long morbHospReporterId;

private String accessionNumber;
private String priorityCd;
private Long materialId;
private Long addUserId;
private String addUserName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void processObservation(String value) {
if(observationData.isPresent()) {
ObservationReporting reportingModel = modelMapper.map(observationData.get(), ObservationReporting.class);
ObservationTransformed observationTransformed = processObservationDataUtil.transformObservationData(observationData.get());
buildReportingModelForTransformedData(reportingModel, observationTransformed);
modelMapper.map(observationTransformed, reportingModel);
pushKeyValuePairToKafka(observationKey, reportingModel, observationTopicOutputReporting);
logger.info("Observation data (uid={}) sent to {}", observationUid, observationTopicOutputReporting);
}
Expand All @@ -114,19 +114,4 @@ private void pushKeyValuePairToKafka(ObservationKey observationKey, Object model
String jsonValue = jsonGenerator.generateStringJson(model);
kafkaTemplate.send(topicName, jsonKey, jsonValue);
}

protected void buildReportingModelForTransformedData(ObservationReporting reportingModel, ObservationTransformed observationTransformed) {
reportingModel.setOrderingPersonId(observationTransformed.getOrderingPersonId());
reportingModel.setPatientId(observationTransformed.getPatientId());
reportingModel.setPerformingOrganizationId(observationTransformed.getPerformingOrganizationId());
reportingModel.setAuthorOrganizationId(observationTransformed.getAuthorOrganizationId());
reportingModel.setOrderingOrganizationId(observationTransformed.getOrderingOrganizationId());
reportingModel.setMaterialId(observationTransformed.getMaterialId());
reportingModel.setResultObservationUid(observationTransformed.getResultObservationUid());
reportingModel.setFollowupObservationUid(observationTransformed.getFollowUpObservationUid());
reportingModel.setReportObservationUid(Optional.ofNullable(observationTransformed.getReportObservationUid())
.orElse(reportingModel.getObservationUid()));
reportingModel.setReportRefrUid(observationTransformed.getReportRefrUid());
reportingModel.setReportSprtUid(observationTransformed.getReportSprtUid());
}
}
Loading

0 comments on commit b467647

Please sign in to comment.