Skip to content

Commit

Permalink
CNDE-2064: Pre-processing service integration for User Profile update (
Browse files Browse the repository at this point in the history
  • Loading branch information
sveselev authored Jan 22, 2025
1 parent 4f6dd66 commit 850f320
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.verify;

public class KafkaProducerServiceTest {
class KafkaProducerServiceTest {

@Mock
private KafkaTemplate<String, String> kafkaTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

public class ObservationControllerTest {
class ObservationControllerTest {

private MockMvc mockMvc;

Expand All @@ -30,13 +30,13 @@ public class ObservationControllerTest {
private ObservationController observationController;

@BeforeEach
public void setUp() {
void setUp() {
MockitoAnnotations.openMocks(this);
mockMvc = MockMvcBuilders.standaloneSetup(observationController).build();
}

@Test
public void publishMessageToKafkaTest() throws Exception {
void publishMessageToKafkaTest() throws Exception {
String jsonData = "{\"key\":\"value\"}";

mockMvc.perform(post("/publish")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import static org.mockito.Mockito.verify;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class KafkaProducerServiceTest {
class KafkaProducerServiceTest {

@Mock
private KafkaTemplate<String, String> kafkaTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
public class PersonServiceController {

private final PersonStatusService dataPipelineStatusSvc;

private final KafkaTemplate<String, String> kafkaTemplate;

private static final String PRODUCED = "Produced : ";

@Value("${spring.kafka.input.topic-name}")
private String personTopicName = "nbs_Person";

@Value("${spring.kafka.input.topic-name-user}")
private String userTopicName = "nbs_Auth_user";

@GetMapping("/reporting/person-svc/status")
public ResponseEntity<String> getDataPipelineStatusHealth() {
return this.dataPipelineStatusSvc.getHealthStatus();
Expand All @@ -29,7 +33,7 @@ public ResponseEntity<String> getDataPipelineStatusHealth() {
public ResponseEntity<String> postProvider(@RequestBody String payLoad) {
try {
kafkaTemplate.send(personTopicName, UUID.randomUUID().toString(), payLoad);
return ResponseEntity.ok("Produced : " + payLoad);
return ResponseEntity.ok(PRODUCED + payLoad);
} catch (Exception ex) {
return ResponseEntity.internalServerError().body("Failed to process the provider. Exception : " + ex.getMessage());
}
Expand All @@ -39,10 +43,19 @@ public ResponseEntity<String> postProvider(@RequestBody String payLoad) {
public ResponseEntity<String> postPatient(@RequestBody String payLoad) {
try {
kafkaTemplate.send(personTopicName, UUID.randomUUID().toString(), payLoad);
return ResponseEntity.ok("Produced : " + payLoad);
return ResponseEntity.ok(PRODUCED + payLoad);
} catch (Exception ex) {
return ResponseEntity.internalServerError().body("Failed to process the Patient. Exception : " + ex.getMessage());
}
}

@PostMapping(value = "/reporting/person-svc/user")
public ResponseEntity<String> postUser(@RequestBody String payLoad) {
try {
kafkaTemplate.send(userTopicName, UUID.randomUUID().toString(), payLoad);
return ResponseEntity.ok(PRODUCED + payLoad);
} catch (Exception ex) {
return ResponseEntity.internalServerError().body("Failed to process the User. Exception : " + ex.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
@Slf4j
@Data
@Builder
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Builder @NoArgsConstructor @AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class PatientSp {
@Id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
@Slf4j
@Data
@Builder
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Builder @NoArgsConstructor @AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ProviderSp {
@Id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package gov.cdc.etldatapipeline.person.model.dto.user;

import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Entity
@Builder @NoArgsConstructor @AllArgsConstructor
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class AuthUser {
@Id
@Column(name = "auth_user_uid")
private Long authUserUid;

@Column(name = "user_id")
private String userId;

@Column(name = "firstNm")
private String firstNm;

@Column(name = "lastNm")
private String lastNm;

@Column(name = "nedss_entry_id")
private Long nedssEntryId;

@Column(name = "provider_uid")
private Long providerUid;

@Column(name = "add_user_id")
private Long addUserId;

@Column(name = "last_chg_user_id")
private Long lastChgUserId;

@Column(name = "add_time")
private String addTime;

@Column(name = "last_chg_time")
private String lastChgTime;

@Column(name = "record_status_cd")
private String recordStatusCd;

@Column(name = "record_status_time")
private String recordStatusTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package gov.cdc.etldatapipeline.person.model.dto.user;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.*;

@Data
@Builder @NoArgsConstructor @AllArgsConstructor
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class AuthUserKey {
@NonNull
@JsonProperty("auth_user_uid")
private Long authUserUid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@
@Repository
public interface PatientRepository extends JpaRepository<PatientSp, String> {
@Query(nativeQuery = true, value = "execute sp_Patient_Event :person_uids")
List<PatientSp> computePatients(@Param("person_uids") String person_uids);

List<PatientSp> computePatients(@Param("person_uids") String personUids);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@
@Repository
public interface ProviderRepository extends JpaRepository<ProviderSp, String> {
@Query(nativeQuery = true, value = "execute sp_provider_event :person_uids")
List<ProviderSp> computeProviders(@Param("person_uids") String person_uids);

List<ProviderSp> computeProviders(@Param("person_uids") String personUids);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gov.cdc.etldatapipeline.person.repository;

import gov.cdc.etldatapipeline.person.model.dto.user.AuthUser;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import java.util.List;
import java.util.Optional;

public interface UserRepository extends JpaRepository<AuthUser, String> {
@Query(nativeQuery = true, value = "execute sp_auth_user_event :user_uids")
Optional<List<AuthUser>> computeAuthUsers(@Param("user_uids") String userUids);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
import gov.cdc.etldatapipeline.commonutil.NoDataException;
import gov.cdc.etldatapipeline.person.model.dto.patient.PatientSp;
import gov.cdc.etldatapipeline.person.model.dto.provider.ProviderSp;
import gov.cdc.etldatapipeline.person.model.dto.user.AuthUser;
import gov.cdc.etldatapipeline.person.repository.PatientRepository;
import gov.cdc.etldatapipeline.person.repository.ProviderRepository;
import gov.cdc.etldatapipeline.person.repository.UserRepository;
import gov.cdc.etldatapipeline.person.transformer.PersonTransformers;
import gov.cdc.etldatapipeline.person.transformer.PersonType;
import jakarta.persistence.EntityNotFoundException;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.SerializationException;
Expand All @@ -28,19 +31,28 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static gov.cdc.etldatapipeline.commonutil.UtilHelper.extractUid;

@Service
@Setter
@Slf4j
@Setter
@RequiredArgsConstructor
public class PersonService {
private final PatientRepository patientRepository;
private final ProviderRepository providerRepository;
private final UserRepository userRepository;
private final PersonTransformers transformer;

private final KafkaTemplate<String, String> kafkaTemplate;

@Value("${spring.kafka.input.topic-name}")
private String personTopic;

@Value("${spring.kafka.input.topic-name-user}")
private String userTopic;

@Value("${spring.kafka.output.patientElastic.topic-name}")
private String patientElasticSearchOutputTopic;

Expand All @@ -53,15 +65,11 @@ public class PersonService {
@Value("${spring.kafka.output.providerReporting.topic-name}")
private String providerReportingOutputTopic;

private static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
private static String topicDebugLog = "Received Person with id: {} from topic: {}";
@Value("${spring.kafka.output.userReporting.topic-name}")
private String userReportingOutputTopic;

public PersonService(PatientRepository patientRepository, ProviderRepository providerRepository, PersonTransformers transformer, KafkaTemplate<String, String> kafkaTemplate) {
this.patientRepository = patientRepository;
this.providerRepository = providerRepository;
this.transformer = transformer;
this.kafkaTemplate = kafkaTemplate;
}
private static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
private static String topicDebugLog = "Received {} with id: {} from topic: {}";

@RetryableTopic(
attempts = "${spring.kafka.consumer.max-retry}",
Expand All @@ -81,17 +89,28 @@ public PersonService(PatientRepository patientRepository, ProviderRepository pro
}
)
@KafkaListener(
topics = "${spring.kafka.input.topic-name}"
topics = {
"${spring.kafka.input.topic-name}",
"${spring.kafka.input.topic-name-user}"
}
)
public void processMessage(String message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
if (topic.equals(personTopic)) {
processPerson(message, topic);
} else if (topic.equals(userTopic)) {
processUser(message, topic);
}
}

private void processPerson(String message, String topic) {
String personUid = "";
try {
JsonNode jsonNode = objectMapper.readTree(message);
JsonNode payloadNode = jsonNode.get("payload").path("after");

personUid = extractUid(message, "person_uid");
log.info(topicDebugLog, personUid, topic);
log.info(topicDebugLog, "Person", personUid, topic);
List<PatientSp> personDataFromStoredProc = patientRepository.computePatients(personUid);
processPatientData(personDataFromStoredProc);

Expand Down Expand Up @@ -148,4 +167,30 @@ private void processPatientData(List<PatientSp> personDataFromStoredProc) {
log.debug("Patient Elastic: {}", elasticData != null ? elasticData : "");
});
}

private void processUser(String message, String topic) {
String userUid = "";
try {
userUid = extractUid(message, "auth_user_uid");
log.info(topicDebugLog, "User", userUid, topic);
Optional<List<AuthUser>> userData = userRepository.computeAuthUsers(userUid);

if (userData.isPresent() && !userData.get().isEmpty()) {
userData.get().forEach(authUser -> {
String jsonKey = transformer.buildUserKey(authUser);
String jsonValue = transformer.processData(authUser);
kafkaTemplate.send(userReportingOutputTopic, jsonKey, jsonValue);
log.info("User data (uid={}) sent to {}", authUser.getAuthUserUid(), userReportingOutputTopic);
});
} else {
throw new EntityNotFoundException("Unable to find AuthUser data for id(s): " + userUid);
}
} catch (EntityNotFoundException ex) {
throw new NoDataException(ex.getMessage(), ex);
} catch (Exception e) {
String msg = "Error processing User data" +
(!userUid.isEmpty() ? " with ids '" + userUid + "': " : ": " + e.getMessage());
throw new RuntimeException(msg, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import gov.cdc.etldatapipeline.person.model.dto.provider.ProviderKey;
import gov.cdc.etldatapipeline.person.model.dto.provider.ProviderReporting;
import gov.cdc.etldatapipeline.person.model.dto.provider.ProviderSp;
import gov.cdc.etldatapipeline.person.model.dto.user.AuthUser;
import gov.cdc.etldatapipeline.person.model.dto.user.AuthUserKey;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -24,6 +26,10 @@ public String buildProviderKey(ProviderSp p) {
return jsonGenerator.generateStringJson(ProviderKey.builder().providerUid(p.getPersonUid()).build());
}

public String buildUserKey(AuthUser u) {
return jsonGenerator.generateStringJson(AuthUserKey.builder().authUserUid(u.getAuthUserUid()).build(), "auth_user_uid");
}

public String processData(PatientSp patientSp, PersonType personType) {
return jsonGenerator.generateStringJson(processData(patientSp, null, personType));
}
Expand All @@ -32,6 +38,10 @@ public String processData(ProviderSp providerSp, PersonType personType) {
return jsonGenerator.generateStringJson(processData(null, providerSp, personType));
}

public String processData(AuthUser user) {
return jsonGenerator.generateStringJson(user, "auth_user_uid");
}

public PersonExtendedProps processData(PatientSp patientSp, ProviderSp providerSp,
PersonType personType) {
PersonExtendedProps transformedObj =
Expand Down
5 changes: 3 additions & 2 deletions person-service/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ spring:
key-serializer: org.springframework.kafka.support.serializer.StringOrBytesSerializer
input:
topic-name: ${INPUT_TOPIC_PERSON:nbs_Person}
defaultData:
topic-name: nbs_default
topic-name-user: ${INPUT_TOPIC_USER:nbs_Auth_user}
output:
providerElastic:
topic-name: ${PROVIDER_ELASTIC:elastic_search_provider}
Expand All @@ -16,6 +15,8 @@ spring:
topic-name: ${PATIENT_ELASTIC:elastic_search_patient}
patientReporting:
topic-name: ${PATIENT_REPORTING:nrt_patient}
userReporting:
topic-name: ${USER_REPORTING:nrt_auth_user}
dlq:
retry-suffix: _retry
dlq-suffix: _dlt
Expand Down
Loading

0 comments on commit 850f320

Please sign in to comment.