Skip to content

Commit

Permalink
MODTLR-64 consume and handle user update events
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-barannyk committed Nov 16, 2024
1 parent 21e44eb commit 7096a98
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 60 deletions.
19 changes: 18 additions & 1 deletion src/main/java/org/folio/listener/kafka/KafkaEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import org.folio.domain.dto.Request;
import org.folio.domain.dto.RequestsBatchUpdate;
import org.folio.domain.dto.User;
import org.folio.domain.dto.UserGroup;
import org.folio.exception.KafkaEventDeserializationException;
import org.folio.service.KafkaEventHandler;
import org.folio.service.impl.RequestBatchUpdateEventHandler;
import org.folio.service.impl.RequestEventHandler;
import org.folio.service.impl.UserEventHandler;
import org.folio.service.impl.UserGroupEventHandler;
import org.folio.spring.integration.XOkapiHeaders;
import org.folio.spring.service.SystemUserScopedExecutionService;
Expand All @@ -33,18 +35,21 @@ public class KafkaEventListener {
private static final ObjectMapper objectMapper = new ObjectMapper();
private final RequestEventHandler requestEventHandler;
private final UserGroupEventHandler userGroupEventHandler;
private final UserEventHandler userEventHandler;
private final SystemUserScopedExecutionService systemUserScopedExecutionService;
private final RequestBatchUpdateEventHandler requestBatchEventHandler;

public KafkaEventListener(@Autowired RequestEventHandler requestEventHandler,
@Autowired RequestBatchUpdateEventHandler requestBatchEventHandler,
@Autowired SystemUserScopedExecutionService systemUserScopedExecutionService,
@Autowired UserGroupEventHandler userGroupEventHandler) {
@Autowired UserGroupEventHandler userGroupEventHandler,
@Autowired UserEventHandler userEventHandler) {

this.requestEventHandler = requestEventHandler;
this.systemUserScopedExecutionService = systemUserScopedExecutionService;
this.userGroupEventHandler = userGroupEventHandler;
this.requestBatchEventHandler = requestBatchEventHandler;
this.userEventHandler = userEventHandler;
}

@KafkaListener(
Expand Down Expand Up @@ -92,6 +97,18 @@ public void handleUserGroupEvent(String eventString, MessageHeaders messageHeade
handleEvent(event, userGroupEventHandler);
}

@KafkaListener(
topicPattern = "${folio.environment}\\.\\w+\\.users\\.users",
groupId = "${spring.kafka.consumer.group-id}"
)
public void handleUserEvent(String eventString, MessageHeaders messageHeaders) {
KafkaEvent<User> event = deserialize(eventString, messageHeaders, User.class);

log.info("handleUserEvent:: event received: {}", event::getId);
log.info("handleUserEvent:: event: {}", () -> event);
handleEvent(event, userEventHandler);
}

private static <T> KafkaEvent<T> deserialize(String eventString, MessageHeaders messageHeaders,
Class<T> dataType) {

Expand Down
47 changes: 47 additions & 0 deletions src/main/java/org/folio/service/impl/AbstractEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.folio.service.impl;

import java.util.function.Consumer;

import org.folio.domain.dto.UserTenant;
import org.folio.service.ConsortiaService;
import org.folio.service.KafkaEventHandler;
import org.folio.service.UserTenantsService;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.support.KafkaEvent;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

@AllArgsConstructor
@Log4j2
public abstract class AbstractEventHandler<T> implements KafkaEventHandler<T> {

protected final UserTenantsService userTenantsService;
protected final ConsortiaService consortiaService;
protected final SystemUserScopedExecutionService systemUserScopedExecutionService;

protected void processEvent(KafkaEvent<T> event, Consumer<T> eventConsumer) {
log.debug("processEvent:: params: event={}", () -> event);
UserTenant firstUserTenant = userTenantsService.findFirstUserTenant();
if (firstUserTenant == null) {
log.info("processEvent: Failed to get user-tenants info");
return;
}
String consortiumId = firstUserTenant.getConsortiumId();
String centralTenantId = firstUserTenant.getCentralTenantId();
log.info("processEvent:: consortiumId: {}, centralTenantId: {}", consortiumId, centralTenantId);

if (!centralTenantId.equals(event.getTenantIdHeaderValue())) {
log.info("processEvent: Ignoring non-central tenant event");
return;
}
processForAllDataTenants(consortiumId, () -> eventConsumer.accept(event.getData().getNewVersion()));
}

private void processForAllDataTenants(String consortiumId, Runnable action) {
log.debug("processForAllDataTenants:: params: consortiumId={}", consortiumId);
consortiaService.getAllConsortiumTenants(consortiumId).getTenants().stream()
.filter(tenant -> !tenant.getIsCentral())
.forEach(tenant -> systemUserScopedExecutionService.executeAsyncSystemUserScoped(tenant.getId(), action));
}
}
36 changes: 36 additions & 0 deletions src/main/java/org/folio/service/impl/UserEventHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.folio.service.impl;

import org.folio.domain.dto.User;
import org.folio.service.ConsortiaService;
import org.folio.service.UserService;
import org.folio.service.UserTenantsService;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.support.KafkaEvent;
import org.springframework.stereotype.Service;

import lombok.extern.log4j.Log4j2;

@Log4j2
@Service
public class UserEventHandler extends AbstractEventHandler<User> {

private final UserService userService;

public UserEventHandler(UserTenantsService userTenantsService,
ConsortiaService consortiaService,
SystemUserScopedExecutionService systemUserScopedExecutionService,
UserService userService) {

super(userTenantsService, consortiaService, systemUserScopedExecutionService);
this.userService = userService;
}

@Override
public void handle(KafkaEvent<User> event) {
log.info("handle:: Processing user event: {}", () -> event);
if (event.getType() == KafkaEvent.EventType.UPDATED) {
processEvent(event, userService::update);
}
}
}

73 changes: 15 additions & 58 deletions src/main/java/org/folio/service/impl/UserGroupEventHandler.java
Original file line number Diff line number Diff line change
@@ -1,80 +1,37 @@
package org.folio.service.impl;

import java.util.function.Consumer;

import org.folio.domain.dto.UserGroup;
import org.folio.domain.dto.UserTenant;
import org.folio.service.ConsortiaService;
import org.folio.service.KafkaEventHandler;
import org.folio.service.UserGroupService;
import org.folio.service.UserTenantsService;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.folio.support.KafkaEvent;
import org.springframework.stereotype.Service;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

@AllArgsConstructor
@Service
@Log4j2
public class UserGroupEventHandler implements KafkaEventHandler<UserGroup> {
@Service
public class UserGroupEventHandler extends AbstractEventHandler<UserGroup> {

private final UserTenantsService userTenantsService;
private final ConsortiaService consortiaService;
private final SystemUserScopedExecutionService systemUserScopedExecutionService;
private final UserGroupService userGroupService;

@Override
public void handle(KafkaEvent<UserGroup> event) {
log.info("handle:: Processing user group event: {}", () -> event);

KafkaEvent.EventType eventType = event.getType();
if (eventType == KafkaEvent.EventType.CREATED) {
processUserGroupCreateEvent(event);
}
if (eventType == KafkaEvent.EventType.UPDATED) {
processUserGroupUpdateEvent(event);
}
}

private void processUserGroupCreateEvent(KafkaEvent<UserGroup> event){
log.debug("processUserGroupCreateEvent:: params: event={}", () -> event);
processUserGroupEvent(event, userGroupService::create);
}
public UserGroupEventHandler(UserTenantsService userTenantsService,
ConsortiaService consortiaService,
SystemUserScopedExecutionService systemUserScopedExecutionService,
UserGroupService userGroupService) {

private void processUserGroupUpdateEvent(KafkaEvent<UserGroup> event) {
log.debug("processUserGroupUpdateEvent:: params: event={}", () -> event);
processUserGroupEvent(event, userGroupService::update);
super(userTenantsService, consortiaService, systemUserScopedExecutionService);
this.userGroupService = userGroupService;
}

private void processUserGroupEvent(KafkaEvent<UserGroup> event,
Consumer<UserGroup> userGroupConsumer) {

log.debug("processUserGroupEvent:: params: event={}", () -> event);
UserTenant firstUserTenant = userTenantsService.findFirstUserTenant();
if (firstUserTenant == null) {
log.info("processUserGroupEvent: Failed to get user-tenants info");
return;
}
String consortiumId = firstUserTenant.getConsortiumId();
String centralTenantId = firstUserTenant.getCentralTenantId();
log.info("processUserGroupEvent:: consortiumId: {}, centralTenantId: {}",
consortiumId, centralTenantId);

if (!centralTenantId.equals(event.getTenantIdHeaderValue())) {
log.info("processUserGroupEvent: Ignoring non-central tenant event");
return;
@Override
public void handle(KafkaEvent<UserGroup> event){
log.info("handle:: Processing user group event: {}", () -> event);
if (event.getType() == KafkaEvent.EventType.CREATED) {
processEvent(event, userGroupService::create);
} else if (event.getType() == KafkaEvent.EventType.UPDATED) {
processEvent(event, userGroupService::update);
}
processUserGroupForAllDataTenants(consortiumId,
() -> userGroupConsumer.accept(event.getData().getNewVersion()));
}

private void processUserGroupForAllDataTenants(String consortiumId, Runnable action) {
log.debug("processUserGroupForAllDataTenants:: params: consortiumId={}", consortiumId);
consortiaService.getAllConsortiumTenants(consortiumId).getTenants().stream()
.filter(tenant -> !tenant.getIsCentral())
.forEach(tenant -> systemUserScopedExecutionService.executeAsyncSystemUserScoped(
tenant.getId(), action));
}
}
6 changes: 5 additions & 1 deletion src/test/java/org/folio/listener/KafkaEventListenerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.folio.listener.kafka.KafkaEventListener;
import org.folio.service.impl.RequestBatchUpdateEventHandler;
import org.folio.service.impl.RequestEventHandler;
import org.folio.service.impl.UserEventHandler;
import org.folio.service.impl.UserGroupEventHandler;
import org.folio.spring.service.SystemUserScopedExecutionService;
import org.junit.jupiter.api.Test;
Expand All @@ -28,13 +29,16 @@ class KafkaEventListenerTest {
SystemUserScopedExecutionService systemUserScopedExecutionService;
@Mock
UserGroupEventHandler userGroupEventHandler;
@Mock
UserEventHandler userEventHandler;

@Test
void shouldHandleExceptionInEventHandler() {
doThrow(new NullPointerException("NPE")).when(systemUserScopedExecutionService)
.executeAsyncSystemUserScoped(any(), any());
KafkaEventListener kafkaEventListener = new KafkaEventListener(requestEventHandler,
requestBatchEventHandler, systemUserScopedExecutionService, userGroupEventHandler);
requestBatchEventHandler, systemUserScopedExecutionService, userGroupEventHandler,
userEventHandler);
kafkaEventListener.handleRequestEvent("{}",
new MessageHeaders(Map.of(TENANT, "default".getBytes())));

Expand Down

0 comments on commit 7096a98

Please sign in to comment.