Skip to content

Commit

Permalink
MODTLR-98 Add support for loan events
Browse files Browse the repository at this point in the history
  • Loading branch information
OleksandrVidinieiev committed Dec 6, 2024
1 parent b00bde1 commit 2c930aa
Show file tree
Hide file tree
Showing 9 changed files with 500 additions and 14 deletions.
23 changes: 22 additions & 1 deletion descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,28 @@
"permissionsRequired": ["tlr.pick-slips.collection.get"],
"modulePermissions": [
"user-tenants.collection.get",
"search.instances.collection.get",
"circulation-storage.requests.item.get",
"circulation-storage.requests.collection.get",
"users.item.get",
"users.collection.get",
"usergroups.item.get",
"usergroups.collection.get",
"departments.item.get",
"departments.collection.get",
"addresstypes.item.get",
"addresstypes.collection.get",
"inventory-storage.service-points.item.get",
"inventory-storage.service-points.collection.get",
"inventory-storage.instances.item.get",
"inventory-storage.instances.collection.get"
]
},
{
"methods": ["GET"],
"pathPattern": "/tlr/staff-slips/search-slips/{servicePointId}",
"permissionsRequired": ["tlr.search-slips.collection.get"],
"modulePermissions": [
"user-tenants.collection.get",
"circulation-storage.requests.item.get",
"circulation-storage.requests.collection.get",
"users.item.get",
Expand Down
29 changes: 24 additions & 5 deletions src/main/java/org/folio/listener/kafka/KafkaEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import java.nio.charset.StandardCharsets;
import java.util.Optional;

import org.folio.domain.dto.Loan;
import org.folio.domain.dto.Request;
import org.folio.domain.dto.RequestsBatchUpdate;
import org.folio.domain.dto.User;
import org.folio.domain.dto.UserGroup;
import org.folio.exception.KafkaEventDeserializationException;
import org.folio.service.KafkaEventHandler;
import org.folio.service.impl.LoanEventHandler;
import org.folio.service.impl.RequestBatchUpdateEventHandler;
import org.folio.service.impl.RequestEventHandler;
import org.folio.service.impl.UserEventHandler;
Expand All @@ -34,18 +36,22 @@
public class KafkaEventListener {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final RequestEventHandler requestEventHandler;
private final LoanEventHandler loanEventHandler;
private final UserGroupEventHandler userGroupEventHandler;
private final UserEventHandler userEventHandler;
private final SystemUserScopedExecutionService systemUserScopedExecutionService;
private final RequestBatchUpdateEventHandler requestBatchEventHandler;

public KafkaEventListener(@Autowired RequestEventHandler requestEventHandler,
@Autowired RequestBatchUpdateEventHandler requestBatchEventHandler,
@Autowired SystemUserScopedExecutionService systemUserScopedExecutionService,
@Autowired UserGroupEventHandler userGroupEventHandler,
@Autowired UserEventHandler userEventHandler) {
@Autowired
public KafkaEventListener(RequestEventHandler requestEventHandler,
LoanEventHandler loanEventHandler,
RequestBatchUpdateEventHandler requestBatchEventHandler,
SystemUserScopedExecutionService systemUserScopedExecutionService,
UserGroupEventHandler userGroupEventHandler,
UserEventHandler userEventHandler) {

this.requestEventHandler = requestEventHandler;
this.loanEventHandler = loanEventHandler;
this.systemUserScopedExecutionService = systemUserScopedExecutionService;
this.userGroupEventHandler = userGroupEventHandler;
this.requestBatchEventHandler = requestBatchEventHandler;
Expand Down Expand Up @@ -76,6 +82,19 @@ public void handleRequestBatchUpdateEvent(String eventString, MessageHeaders mes
log.info("handleRequestBatchUpdateEvent:: event consumed: {}", event::getId);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.circulation\\.loan",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleLoanEvent(String eventString, MessageHeaders messageHeaders) {
log.debug("handleLoanEvent:: event: {}", () -> eventString);
KafkaEvent<Loan> event = deserialize(eventString, messageHeaders, Loan.class);
log.info("handleLoanEvent:: event received: {}", event::getId);
log.info("handleLoanEvent:: event: {}", eventString);
handleEvent(event, loanEventHandler);
log.info("handleLoanEvent:: event consumed: {}", event::getId);
}

private <T> void handleEvent(KafkaEvent<T> event, KafkaEventHandler<T> handler) {
try {
systemUserScopedExecutionService.executeAsyncSystemUserScoped(CENTRAL_TENANT_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ private AllowedServicePointsResponse getForCreate(AllowedServicePointsRequest re
Map<String, AllowedServicePointsInner> recall = new HashMap<>();
for (String tenantId : getLendingTenants(request)) {
var servicePoints = getAllowedServicePointsFromTenant(request, patronGroupId, tenantId);
log.info("getForCreate:: service points from {}: {}", tenantId, servicePoints);

combineAndFilterDuplicates(page, servicePoints.getPage());
combineAndFilterDuplicates(hold, servicePoints.getHold());
combineAndFilterDuplicates(recall, servicePoints.getRecall());
Expand Down
281 changes: 281 additions & 0 deletions src/main/java/org/folio/service/impl/LoanEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package org.folio.service.impl;

import static org.folio.domain.dto.Request.EcsRequestPhaseEnum.PRIMARY;
import static org.folio.domain.dto.Request.EcsRequestPhaseEnum.SECONDARY;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.AWAITING_PICKUP;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.CANCELLED;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.ITEM_CHECKED_OUT;
import static org.folio.domain.dto.TransactionStatus.StatusEnum.OPEN;
import static org.folio.support.KafkaEvent.EventType.UPDATED;

import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;

import org.folio.domain.dto.Loan;
import org.folio.domain.dto.Request;
import org.folio.domain.dto.Request.EcsRequestPhaseEnum;
import org.folio.domain.dto.Request.FulfillmentPreferenceEnum;
import org.folio.domain.dto.ServicePoint;
import org.folio.domain.dto.TransactionStatus;
import org.folio.domain.entity.EcsTlrEntity;
import org.folio.repository.EcsTlrRepository;
import org.folio.service.CloningService;
import org.folio.service.DcbService;
import org.folio.service.KafkaEventHandler;
import org.folio.service.RequestService;
import org.folio.service.ServicePointService;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.support.KafkaEvent;
import org.springframework.stereotype.Service;

import feign.FeignException;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

@AllArgsConstructor
@Service
@Log4j2
public class LoanEventHandler implements KafkaEventHandler<Loan> {

private final DcbService dcbService;
private final EcsTlrRepository ecsTlrRepository;
private final SystemUserScopedExecutionService executionService;
private final ServicePointService servicePointService;
private final CloningService<ServicePoint> servicePointCloningService;
private final RequestService requestService;

@Override
public void handle(KafkaEvent<Loan> event) {
log.info("handle:: processing loan event: {}", event::getId);
if (event.getType() == UPDATED) {
handleLoanUpdateEvent(event);
} else {
log.info("handle:: ignoring event {} of unsupported type: {}", event::getId, event::getType);
}
log.info("handle:: loan event processed: {}", event::getId);
}

private void handleLoanUpdateEvent(KafkaEvent<Loan> event) {
log.info("handleLoanUpdateEvent:: handling loan update event: {}", event::getId);
// Loan updatedRequest = event.getData().getNewVersion();
// if (updatedRequest == null) {
// log.warn("handleRequestUpdateEvent:: event does not contain new version of request");
// return;
// }
// if (updatedRequest.getEcsRequestPhase() == null) {
// log.info("handleRequestUpdateEvent:: updated request is not an ECS request");
// return;
// }
// if (updatedRequest.getEcsRequestPhase() == SECONDARY && updatedRequest.getItemId() == null) {
// log.info("handleRequestUpdateEvent:: updated secondary request does not contain itemId");
// return;
// }
// String requestId = updatedRequest.getId();
// log.info("handleRequestUpdateEvent:: looking for ECS TLR for request {}", requestId);
// // we can search by either primary or secondary request ID, they are identical
// ecsTlrRepository.findBySecondaryRequestId(UUID.fromString(requestId)).ifPresentOrElse(
// ecsTlr -> handleRequestUpdateEvent(ecsTlr, event),
// () -> log.info("handleSecondaryRequestUpdate: ECS TLR for request {} not found", requestId));
}

private void handleRequestUpdateEvent(EcsTlrEntity ecsTlr, KafkaEvent<Request> event) {
log.debug("handleRequestUpdateEvent:: ecsTlr={}", () -> ecsTlr);
Request updatedRequest = event.getData().getNewVersion();

if (!requestMatchesEcsTlr(ecsTlr, updatedRequest, event.getTenantIdHeaderValue())) {
return;
}
if (updatedRequest.getEcsRequestPhase() == PRIMARY) {
handlePrimaryRequestUpdate(ecsTlr, event);
}
if (updatedRequest.getEcsRequestPhase() == SECONDARY) {
handleSecondaryRequestUpdate(ecsTlr, event);
}
}

private static boolean requestMatchesEcsTlr(EcsTlrEntity ecsTlr, Request updatedRequest,
String updatedRequestTenant) {

final EcsRequestPhaseEnum updatedRequestPhase = updatedRequest.getEcsRequestPhase();
final UUID updatedRequestId = UUID.fromString(updatedRequest.getId());

if (updatedRequestPhase == PRIMARY && updatedRequestId.equals(ecsTlr.getPrimaryRequestId())
&& updatedRequestTenant.equals(ecsTlr.getPrimaryRequestTenantId())) {
log.info("requestMatchesEcsTlr:: updated primary request matches ECS TLR");
return true;
} else if (updatedRequestPhase == SECONDARY && updatedRequestId.equals(ecsTlr.getSecondaryRequestId())
&& updatedRequestTenant.equals(ecsTlr.getSecondaryRequestTenantId())) {
log.info("requestMatchesEcsTlr:: updated secondary request matches ECS TLR");
return true;
}
log.warn("requestMatchesEcsTlr:: request does not match ECS TLR: updatedRequestPhase={}, " +
"updatedRequestId={}, updatedRequestTenant={}, ecsTlr={}", updatedRequestPhase,
updatedRequestId, updatedRequestTenant, ecsTlr);
return false;
}

private void handlePrimaryRequestUpdate(EcsTlrEntity ecsTlr, KafkaEvent<Request> event) {
propagateChangesFromPrimaryToSecondaryRequest(ecsTlr, event);
updateTransactionStatuses(event, ecsTlr);
}

private void handleSecondaryRequestUpdate(EcsTlrEntity ecsTlr, KafkaEvent<Request> event) {
processItemIdUpdate(ecsTlr, event.getData().getNewVersion());
updateTransactionStatuses(event, ecsTlr);
}

private void processItemIdUpdate(EcsTlrEntity ecsTlr, Request updatedRequest) {
if (ecsTlr.getItemId() != null) {
log.info("processItemIdUpdate:: ECS TLR {} already has itemId {}", ecsTlr::getId, ecsTlr::getItemId);
return;
}
log.info("processItemIdUpdate:: updating ECS TLR {} with itemId {}", ecsTlr::getId,
updatedRequest::getItemId);
ecsTlr.setItemId(UUID.fromString(updatedRequest.getItemId()));
// TODO: change this if Page request works
dcbService.createTransactions(ecsTlr, updatedRequest);
ecsTlrRepository.save(ecsTlr);
log.info("processItemIdUpdate: ECS TLR {} is updated", ecsTlr::getId);
}

private static Optional<TransactionStatus.StatusEnum> determineNewTransactionStatus(
KafkaEvent<Request> event) {

final Request.StatusEnum oldRequestStatus = event.getData().getOldVersion().getStatus();
final Request.StatusEnum newRequestStatus = event.getData().getNewVersion().getStatus();
log.info("determineNewTransactionStatus:: oldRequestStatus='{}', newRequestStatus='{}'",
oldRequestStatus, newRequestStatus);

if (newRequestStatus == oldRequestStatus) {
log.info("determineNewTransactionStatus:: request status did not change");
return Optional.empty();
}

var newTransactionStatus = Optional.ofNullable(
switch (newRequestStatus) {
case OPEN_IN_TRANSIT -> OPEN;
case OPEN_AWAITING_PICKUP -> AWAITING_PICKUP;
case CLOSED_FILLED -> ITEM_CHECKED_OUT;
case CLOSED_CANCELLED -> CANCELLED;
default -> null;
});

newTransactionStatus.ifPresentOrElse(
ts -> log.info("determineNewTransactionStatus:: new transaction status: {}", ts),
() -> log.info("determineNewTransactionStatus:: irrelevant request status change"));

return newTransactionStatus;
}

private void updateTransactionStatuses(KafkaEvent<Request> event, EcsTlrEntity ecsTlr) {
determineNewTransactionStatus(event)
.ifPresent(newStatus -> updateTransactionStatuses(newStatus, ecsTlr));
}

private void updateTransactionStatuses(TransactionStatus.StatusEnum newStatus, EcsTlrEntity ecsTlr) {
log.info("updateTransactionStatuses:: updating primary transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getPrimaryRequestDcbTransactionId(), newStatus,
ecsTlr.getPrimaryRequestTenantId());

log.info("updateTransactionStatuses:: updating intermediate transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getIntermediateRequestDcbTransactionId(), newStatus,
ecsTlr.getIntermediateRequestTenantId());

log.info("updateTransactionStatuses:: updating secondary transaction status to {}", newStatus::getValue);
updateTransactionStatus(ecsTlr.getSecondaryRequestDcbTransactionId(), newStatus,
ecsTlr.getSecondaryRequestTenantId());
}

private void updateTransactionStatus(UUID transactionId,
TransactionStatus.StatusEnum newStatus, String tenantId) {

if (transactionId == null) {
log.info("updateTransactionStatus:: transaction ID is null, doing nothing");
return;
}
if (tenantId == null) {
log.info("updateTransactionStatus:: tenant ID is null, doing nothing");
return;
}

try {
var currentStatus = dcbService.getTransactionStatus(transactionId, tenantId).getStatus();
log.info("updateTransactionStatus:: current transaction status: {}", currentStatus);
if (newStatus.getValue().equals(currentStatus.getValue())) {
log.info("updateTransactionStatus:: transaction status did not change, doing nothing");
return;
}
log.info("updateTransactionStatus: changing status of transaction {} in tenant {} from {} to {}",
transactionId, tenantId, currentStatus.getValue(), newStatus.getValue());
dcbService.updateTransactionStatus(transactionId, newStatus, tenantId);
} catch (FeignException.NotFound e) {
log.error("updateTransactionStatus:: transaction {} not found: {}", transactionId, e.getMessage());
} catch (Exception e) {
log.error("updateTransactionStatus:: failed to update transaction status: {}", e::getMessage);
log.debug("updateTransactionStatus:: ", e);
}
}

private void propagateChangesFromPrimaryToSecondaryRequest(EcsTlrEntity ecsTlr,
KafkaEvent<Request> event) {

String secondaryRequestId = ecsTlr.getSecondaryRequestId().toString();
String secondaryRequestTenantId = ecsTlr.getSecondaryRequestTenantId();
Request primaryRequest = event.getData().getNewVersion();
Request secondaryRequest = requestService.getRequestFromStorage(
secondaryRequestId, secondaryRequestTenantId);

boolean shouldUpdateSecondaryRequest = false;
if (valueIsNotEqual(primaryRequest, secondaryRequest, Request::getRequestExpirationDate)) {
Date requestExpirationDate = primaryRequest.getRequestExpirationDate();
log.info("propagateChangesFromPrimaryToSecondaryRequest:: request expiration date changed: {}",
requestExpirationDate);
secondaryRequest.setRequestExpirationDate(requestExpirationDate);
shouldUpdateSecondaryRequest = true;
}
if (valueIsNotEqual(primaryRequest, secondaryRequest, Request::getFulfillmentPreference)) {
FulfillmentPreferenceEnum fulfillmentPreference = primaryRequest.getFulfillmentPreference();
log.info("propagateChangesFromPrimaryToSecondaryRequest:: fulfillment preference changed: {}",
fulfillmentPreference);
secondaryRequest.setFulfillmentPreference(fulfillmentPreference);
shouldUpdateSecondaryRequest = true;
}
if (valueIsNotEqual(primaryRequest, secondaryRequest, Request::getPickupServicePointId)) {
String pickupServicePointId = primaryRequest.getPickupServicePointId();
log.info("propagateChangesFromPrimaryToSecondaryRequest:: pickup service point ID changed: {}",
pickupServicePointId);
secondaryRequest.setPickupServicePointId(pickupServicePointId);
shouldUpdateSecondaryRequest = true;
clonePickupServicePoint(ecsTlr, pickupServicePointId);
}

if (!shouldUpdateSecondaryRequest) {
log.info("propagateChangesFromPrimaryToSecondaryRequest:: no relevant changes detected");
return;
}

log.info("propagateChangesFromPrimaryToSecondaryRequest:: updating secondary request");
requestService.updateRequestInStorage(secondaryRequest, secondaryRequestTenantId);
log.info("propagateChangesFromPrimaryToSecondaryRequest:: secondary request updated");
}

private void clonePickupServicePoint(EcsTlrEntity ecsTlr, String pickupServicePointId) {
if (pickupServicePointId == null) {
log.info("clonePickupServicePoint:: pickupServicePointId is null, doing nothing");
return;
}
log.info("clonePickupServicePoint:: ensuring that service point {} exists in lending tenant",
pickupServicePointId);
ServicePoint pickupServicePoint = executionService.executeSystemUserScoped(
ecsTlr.getPrimaryRequestTenantId(), () -> servicePointService.find(pickupServicePointId));
executionService.executeSystemUserScoped(ecsTlr.getSecondaryRequestTenantId(),
() -> servicePointCloningService.clone(pickupServicePoint));
}

private static <T, V> boolean valueIsNotEqual(T o1, T o2, Function<T, V> valueExtractor) {
return !Objects.equals(valueExtractor.apply(o1), valueExtractor.apply(o2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ public RequestWrapper createIntermediateRequest(Request intermediateRequest,
Request request = circulationClient.createRequest(intermediateRequest);
log.info("createIntermediateRequest:: intermediate request {} created in tenant {}",
request.getId(), intermediateRequestTenantId);
log.info("createIntermediateRequest:: intermediate request: {}", () -> request);

updateCirculationItemOnRequestCreation(circItem, request);

Expand Down
Loading

0 comments on commit 2c930aa

Please sign in to comment.