From 7096a98f6934a5c901595b74750123aaf5d18a3c Mon Sep 17 00:00:00 2001 From: Roman_Barannyk Date: Sat, 16 Nov 2024 18:26:21 +0200 Subject: [PATCH] MODTLR-64 consume and handle user update events --- .../listener/kafka/KafkaEventListener.java | 19 ++++- .../service/impl/AbstractEventHandler.java | 47 ++++++++++++ .../folio/service/impl/UserEventHandler.java | 36 +++++++++ .../service/impl/UserGroupEventHandler.java | 73 ++++--------------- .../listener/KafkaEventListenerTest.java | 6 +- 5 files changed, 121 insertions(+), 60 deletions(-) create mode 100644 src/main/java/org/folio/service/impl/AbstractEventHandler.java create mode 100644 src/main/java/org/folio/service/impl/UserEventHandler.java diff --git a/src/main/java/org/folio/listener/kafka/KafkaEventListener.java b/src/main/java/org/folio/listener/kafka/KafkaEventListener.java index 897b6e08..71d78438 100644 --- a/src/main/java/org/folio/listener/kafka/KafkaEventListener.java +++ b/src/main/java/org/folio/listener/kafka/KafkaEventListener.java @@ -7,11 +7,13 @@ import org.folio.domain.dto.Request; import org.folio.domain.dto.RequestsBatchUpdate; +import org.folio.domain.dto.User; import org.folio.domain.dto.UserGroup; import org.folio.exception.KafkaEventDeserializationException; import org.folio.service.KafkaEventHandler; import org.folio.service.impl.RequestBatchUpdateEventHandler; import org.folio.service.impl.RequestEventHandler; +import org.folio.service.impl.UserEventHandler; import org.folio.service.impl.UserGroupEventHandler; import org.folio.spring.integration.XOkapiHeaders; import org.folio.spring.service.SystemUserScopedExecutionService; @@ -33,18 +35,21 @@ public class KafkaEventListener { private static final ObjectMapper objectMapper = new ObjectMapper(); private final RequestEventHandler requestEventHandler; private final UserGroupEventHandler userGroupEventHandler; + private final UserEventHandler userEventHandler; private final SystemUserScopedExecutionService systemUserScopedExecutionService; private final RequestBatchUpdateEventHandler requestBatchEventHandler; public KafkaEventListener(@Autowired RequestEventHandler requestEventHandler, @Autowired RequestBatchUpdateEventHandler requestBatchEventHandler, @Autowired SystemUserScopedExecutionService systemUserScopedExecutionService, - @Autowired UserGroupEventHandler userGroupEventHandler) { + @Autowired UserGroupEventHandler userGroupEventHandler, + @Autowired UserEventHandler userEventHandler) { this.requestEventHandler = requestEventHandler; this.systemUserScopedExecutionService = systemUserScopedExecutionService; this.userGroupEventHandler = userGroupEventHandler; this.requestBatchEventHandler = requestBatchEventHandler; + this.userEventHandler = userEventHandler; } @KafkaListener( @@ -92,6 +97,18 @@ public void handleUserGroupEvent(String eventString, MessageHeaders messageHeade handleEvent(event, userGroupEventHandler); } + @KafkaListener( + topicPattern = "${folio.environment}\\.\\w+\\.users\\.users", + groupId = "${spring.kafka.consumer.group-id}" + ) + public void handleUserEvent(String eventString, MessageHeaders messageHeaders) { + KafkaEvent event = deserialize(eventString, messageHeaders, User.class); + + log.info("handleUserEvent:: event received: {}", event::getId); + log.info("handleUserEvent:: event: {}", () -> event); + handleEvent(event, userEventHandler); + } + private static KafkaEvent deserialize(String eventString, MessageHeaders messageHeaders, Class dataType) { diff --git a/src/main/java/org/folio/service/impl/AbstractEventHandler.java b/src/main/java/org/folio/service/impl/AbstractEventHandler.java new file mode 100644 index 00000000..752b1f14 --- /dev/null +++ b/src/main/java/org/folio/service/impl/AbstractEventHandler.java @@ -0,0 +1,47 @@ +package org.folio.service.impl; + +import java.util.function.Consumer; + +import org.folio.domain.dto.UserTenant; +import org.folio.service.ConsortiaService; +import org.folio.service.KafkaEventHandler; +import org.folio.service.UserTenantsService; +import org.folio.spring.service.SystemUserScopedExecutionService; +import org.folio.support.KafkaEvent; + +import lombok.AllArgsConstructor; +import lombok.extern.log4j.Log4j2; + +@AllArgsConstructor +@Log4j2 +public abstract class AbstractEventHandler implements KafkaEventHandler { + + protected final UserTenantsService userTenantsService; + protected final ConsortiaService consortiaService; + protected final SystemUserScopedExecutionService systemUserScopedExecutionService; + + protected void processEvent(KafkaEvent event, Consumer eventConsumer) { + log.debug("processEvent:: params: event={}", () -> event); + UserTenant firstUserTenant = userTenantsService.findFirstUserTenant(); + if (firstUserTenant == null) { + log.info("processEvent: Failed to get user-tenants info"); + return; + } + String consortiumId = firstUserTenant.getConsortiumId(); + String centralTenantId = firstUserTenant.getCentralTenantId(); + log.info("processEvent:: consortiumId: {}, centralTenantId: {}", consortiumId, centralTenantId); + + if (!centralTenantId.equals(event.getTenantIdHeaderValue())) { + log.info("processEvent: Ignoring non-central tenant event"); + return; + } + processForAllDataTenants(consortiumId, () -> eventConsumer.accept(event.getData().getNewVersion())); + } + + private void processForAllDataTenants(String consortiumId, Runnable action) { + log.debug("processForAllDataTenants:: params: consortiumId={}", consortiumId); + consortiaService.getAllConsortiumTenants(consortiumId).getTenants().stream() + .filter(tenant -> !tenant.getIsCentral()) + .forEach(tenant -> systemUserScopedExecutionService.executeAsyncSystemUserScoped(tenant.getId(), action)); + } +} diff --git a/src/main/java/org/folio/service/impl/UserEventHandler.java b/src/main/java/org/folio/service/impl/UserEventHandler.java new file mode 100644 index 00000000..a06547fc --- /dev/null +++ b/src/main/java/org/folio/service/impl/UserEventHandler.java @@ -0,0 +1,36 @@ +package org.folio.service.impl; + +import org.folio.domain.dto.User; +import org.folio.service.ConsortiaService; +import org.folio.service.UserService; +import org.folio.service.UserTenantsService; +import org.folio.spring.service.SystemUserScopedExecutionService; +import org.folio.support.KafkaEvent; +import org.springframework.stereotype.Service; + +import lombok.extern.log4j.Log4j2; + +@Log4j2 +@Service +public class UserEventHandler extends AbstractEventHandler { + + private final UserService userService; + + public UserEventHandler(UserTenantsService userTenantsService, + ConsortiaService consortiaService, + SystemUserScopedExecutionService systemUserScopedExecutionService, + UserService userService) { + + super(userTenantsService, consortiaService, systemUserScopedExecutionService); + this.userService = userService; + } + + @Override + public void handle(KafkaEvent event) { + log.info("handle:: Processing user event: {}", () -> event); + if (event.getType() == KafkaEvent.EventType.UPDATED) { + processEvent(event, userService::update); + } + } +} + diff --git a/src/main/java/org/folio/service/impl/UserGroupEventHandler.java b/src/main/java/org/folio/service/impl/UserGroupEventHandler.java index e2b12297..f1ca43e4 100644 --- a/src/main/java/org/folio/service/impl/UserGroupEventHandler.java +++ b/src/main/java/org/folio/service/impl/UserGroupEventHandler.java @@ -1,80 +1,37 @@ package org.folio.service.impl; -import java.util.function.Consumer; - import org.folio.domain.dto.UserGroup; -import org.folio.domain.dto.UserTenant; import org.folio.service.ConsortiaService; -import org.folio.service.KafkaEventHandler; import org.folio.service.UserGroupService; import org.folio.service.UserTenantsService; import org.folio.spring.service.SystemUserScopedExecutionService; import org.folio.support.KafkaEvent; import org.springframework.stereotype.Service; -import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; -@AllArgsConstructor -@Service @Log4j2 -public class UserGroupEventHandler implements KafkaEventHandler { +@Service +public class UserGroupEventHandler extends AbstractEventHandler { - private final UserTenantsService userTenantsService; - private final ConsortiaService consortiaService; - private final SystemUserScopedExecutionService systemUserScopedExecutionService; private final UserGroupService userGroupService; - @Override - public void handle(KafkaEvent event) { - log.info("handle:: Processing user group event: {}", () -> event); - - KafkaEvent.EventType eventType = event.getType(); - if (eventType == KafkaEvent.EventType.CREATED) { - processUserGroupCreateEvent(event); - } - if (eventType == KafkaEvent.EventType.UPDATED) { - processUserGroupUpdateEvent(event); - } - } - - private void processUserGroupCreateEvent(KafkaEvent event){ - log.debug("processUserGroupCreateEvent:: params: event={}", () -> event); - processUserGroupEvent(event, userGroupService::create); - } + public UserGroupEventHandler(UserTenantsService userTenantsService, + ConsortiaService consortiaService, + SystemUserScopedExecutionService systemUserScopedExecutionService, + UserGroupService userGroupService) { - private void processUserGroupUpdateEvent(KafkaEvent event) { - log.debug("processUserGroupUpdateEvent:: params: event={}", () -> event); - processUserGroupEvent(event, userGroupService::update); + super(userTenantsService, consortiaService, systemUserScopedExecutionService); + this.userGroupService = userGroupService; } - private void processUserGroupEvent(KafkaEvent event, - Consumer userGroupConsumer) { - - log.debug("processUserGroupEvent:: params: event={}", () -> event); - UserTenant firstUserTenant = userTenantsService.findFirstUserTenant(); - if (firstUserTenant == null) { - log.info("processUserGroupEvent: Failed to get user-tenants info"); - return; - } - String consortiumId = firstUserTenant.getConsortiumId(); - String centralTenantId = firstUserTenant.getCentralTenantId(); - log.info("processUserGroupEvent:: consortiumId: {}, centralTenantId: {}", - consortiumId, centralTenantId); - - if (!centralTenantId.equals(event.getTenantIdHeaderValue())) { - log.info("processUserGroupEvent: Ignoring non-central tenant event"); - return; + @Override + public void handle(KafkaEvent event){ + log.info("handle:: Processing user group event: {}", () -> event); + if (event.getType() == KafkaEvent.EventType.CREATED) { + processEvent(event, userGroupService::create); + } else if (event.getType() == KafkaEvent.EventType.UPDATED) { + processEvent(event, userGroupService::update); } - processUserGroupForAllDataTenants(consortiumId, - () -> userGroupConsumer.accept(event.getData().getNewVersion())); - } - - private void processUserGroupForAllDataTenants(String consortiumId, Runnable action) { - log.debug("processUserGroupForAllDataTenants:: params: consortiumId={}", consortiumId); - consortiaService.getAllConsortiumTenants(consortiumId).getTenants().stream() - .filter(tenant -> !tenant.getIsCentral()) - .forEach(tenant -> systemUserScopedExecutionService.executeAsyncSystemUserScoped( - tenant.getId(), action)); } } diff --git a/src/test/java/org/folio/listener/KafkaEventListenerTest.java b/src/test/java/org/folio/listener/KafkaEventListenerTest.java index 2d9be08d..759f8272 100644 --- a/src/test/java/org/folio/listener/KafkaEventListenerTest.java +++ b/src/test/java/org/folio/listener/KafkaEventListenerTest.java @@ -10,6 +10,7 @@ import org.folio.listener.kafka.KafkaEventListener; import org.folio.service.impl.RequestBatchUpdateEventHandler; import org.folio.service.impl.RequestEventHandler; +import org.folio.service.impl.UserEventHandler; import org.folio.service.impl.UserGroupEventHandler; import org.folio.spring.service.SystemUserScopedExecutionService; import org.junit.jupiter.api.Test; @@ -28,13 +29,16 @@ class KafkaEventListenerTest { SystemUserScopedExecutionService systemUserScopedExecutionService; @Mock UserGroupEventHandler userGroupEventHandler; + @Mock + UserEventHandler userEventHandler; @Test void shouldHandleExceptionInEventHandler() { doThrow(new NullPointerException("NPE")).when(systemUserScopedExecutionService) .executeAsyncSystemUserScoped(any(), any()); KafkaEventListener kafkaEventListener = new KafkaEventListener(requestEventHandler, - requestBatchEventHandler, systemUserScopedExecutionService, userGroupEventHandler); + requestBatchEventHandler, systemUserScopedExecutionService, userGroupEventHandler, + userEventHandler); kafkaEventListener.handleRequestEvent("{}", new MessageHeaders(Map.of(TENANT, "default".getBytes())));