Skip to content

Commit

Permalink
feat(specification-storage): consume specification update request eve…
Browse files Browse the repository at this point in the history
…nts (#78)

Closes: MRSPECS-60
  • Loading branch information
psmagin authored Oct 2, 2024
1 parent 7450014 commit 7737c8b
Show file tree
Hide file tree
Showing 37 changed files with 878 additions and 87 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Restrict creating indicators/subfields for control fields ([MRSPECS-39](https://folio-org.atlassian.net//browse/MRSPECS-39))
* Implement GET specification by id endpoint ([MRSPECS-50](https://folio-org.atlassian.net//browse/MRSPECS-50))
* Sync specifications on new tenant creation ([MRSPECS-53](https://folio-org.atlassian.net//browse/MRSPECS-53))
* Consume specification update request events ([MRSPECS-60](https://folio-org.atlassian.net//browse/MRSPECS-60))

#### Validator
* implement validation by MARC specification ([MRSPECS-38](https://folio-org.atlassian.net//browse/MRSPECS-38))
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ Below are the environment variables used by this module:
| KAFKA_SSL_KEYSTORE_PASSWORD | - | The store password for the Kafka key store file. This is optional for client and only needed if 'ssl.keystore.location' is configured. |
| KAFKA_SSL_TRUSTSTORE_LOCATION | - | The location of the Kafka trust store file. |
| KAFKA_SSL_TRUSTSTORE_PASSWORD | - | The password for the Kafka trust store file. If a password is not set, trust store file configured will still be used, but integrity checking is disabled. |
| KAFKA_SPECIFICATION_UPDATE_TOPIC_PARTITIONS | 1 | Amount of partitions for `specification-storage.specification.updated` topic. |
| KAFKA_SPECIFICATION_UPDATE_TOPIC_REPLICATION_FACTOR | - | Replication factor for `specification-storage.specification.updated` topic. |

Change these variables as per your requirements.

Expand Down
10 changes: 0 additions & 10 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -459,16 +459,6 @@
"name": "KAFKA_SSL_TRUSTSTORE_PASSWORD",
"value": "",
"description": "The password for the Kafka trust store file. If a password is not set, trust store file configured will still be used, but integrity checking is disabled."
},
{
"name": "KAFKA_SPECIFICATION_UPDATE_TOPIC_PARTITIONS",
"value": "1",
"description": "Amount of partitions for `specification-storage.specification.updated` topic."
},
{
"name": "KAFKA_SPECIFICATION_UPDATE_TOPIC_REPLICATION_FACTOR",
"value": "",
"description": "Replication factor for `specification-storage.specification.updated` topic."
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.folio.rspec.domain.dto;

import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = true)
public class SubfieldUpdateRequestEvent extends UpdateRequestEvent {

private String targetFieldTag;
private SubfieldDto subfield;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.folio.rspec.domain.dto;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import lombok.Data;

@Data
@JsonTypeInfo(
use = Id.NAME,
property = "definitionType"
)
@JsonSubTypes({
@Type(value = SubfieldUpdateRequestEvent.class, name = "SUBFIELD")
})
public abstract class UpdateRequestEvent {

private Family family;
private FamilyProfile profile;
private DefinitionType definitionType;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,30 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.folio.rspec.domain.dto.SpecificationUpdatedEvent;
import org.folio.rspec.domain.dto.UpdateRequestEvent;
import org.jetbrains.annotations.NotNull;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.FixedBackOff;

/**
* Responsible for Kafka configuration.
*/
@Log4j2
@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration {
Expand Down Expand Up @@ -46,8 +57,32 @@ public KafkaTemplate<String, SpecificationUpdatedEvent> specificationChangeChang
return new KafkaTemplate<>(factory);
}

@Bean
public ConsumerFactory<String, UpdateRequestEvent> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(null),
new StringDeserializer(),
new JsonDeserializer<>(UpdateRequestEvent.class));
}

@Bean("updateRequestEventListenerFactory")
public ConcurrentKafkaListenerContainerFactory<String, UpdateRequestEvent> listenerFactory(
ConsumerFactory<String, UpdateRequestEvent> factory) {
var listenerFactory = new ConcurrentKafkaListenerContainerFactory<String, UpdateRequestEvent>();
listenerFactory.setConsumerFactory(factory);
listenerFactory.setCommonErrorHandler(listenerErrorHandler());
return listenerFactory;
}

private @NotNull DefaultErrorHandler listenerErrorHandler() {
return new DefaultErrorHandler(
(thrownException, event) -> log.error("Error in event processing [exception={}, event={}",
thrownException, event),
new FixedBackOff(2000L, 3L));
}

private <T> ProducerFactory<String, T> getProducerConfigProps(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties(null),
new StringSerializer(), new JsonSerializer<>(objectMapper));
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.folio.rspec.domain.repository;

import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.folio.rspec.domain.entity.Field;
import org.springframework.data.jpa.repository.JpaRepository;
Expand All @@ -20,4 +21,7 @@ public interface FieldRepository extends JpaRepository<Field, UUID> {
@Modifying
@Query("delete from Field f where f.specification.id = ?1")
void deleteBySpecificationId(UUID specificationId);

@Query("select f from Field f where f.specification.id = ?1 and f.tag = ?2")
Optional<Field> findBySpecificationIdAndTag(UUID specificationId, String targetFieldTag);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.folio.rspec.exception;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import lombok.Getter;
import org.apache.commons.lang3.tuple.Pair;

/**
* This class represents a custom exception indicating that a resource was not found.
Expand All @@ -9,15 +13,25 @@
@Getter
public class ResourceNotFoundException extends RuntimeException {

private static final String MSG_TEMPLATE = "%s with ID [%s] was not found";
private static final String MSG_BY_ID_TEMPLATE = "%s with ID [%s] was not found";
private static final String MSG_BY_PARAMS_TEMPLATE = "%s with params [%s] was not found";

private final transient Resource resource;
private final transient Object id;
private final transient List<Pair<String, Object>> searchableParams;

protected ResourceNotFoundException(Resource resource, Object id) {
super(String.format(MSG_TEMPLATE, resource.getName(), id));
super(String.format(MSG_BY_ID_TEMPLATE, resource.getName(), id));
this.resource = resource;
this.id = id;
this.searchableParams = new ArrayList<>();
}

protected ResourceNotFoundException(Resource resource, List<Pair<String, Object>> searchableParams) {
super(String.format(MSG_BY_PARAMS_TEMPLATE, resource.getName(), searchableParams));
this.resource = resource;
this.searchableParams = searchableParams;
this.id = null;
}

public static ResourceNotFoundException forSpecification(Object id) {
Expand All @@ -32,6 +46,13 @@ public static ResourceNotFoundException forField(Object id) {
return new ResourceNotFoundException(Resource.FIELD_DEFINITION, id);
}

public static ResourceNotFoundException forField(UUID specificationId, String fieldTag) {
return new ResourceNotFoundException(Resource.FIELD_DEFINITION, List.of(
Pair.of("specificationId", specificationId),
Pair.of("tag", fieldTag)
));
}

public static ResourceNotFoundException forIndicator(Object id) {
return new ResourceNotFoundException(Resource.FIELD_INDICATOR, id);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.folio.rspec.exception;

import org.folio.rspec.domain.dto.Family;
import org.folio.rspec.domain.dto.FamilyProfile;

public class UpdateRequestProcessingException extends RuntimeException {

public UpdateRequestProcessingException(String message) {
super(message);
}

public static UpdateRequestProcessingException specificationNotFound(Family family, FamilyProfile profile) {
return new UpdateRequestProcessingException("Specification for family=%s, profile=%s not found"
.formatted(family, profile));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.folio.rspec.integration.kafka;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.rspec.domain.dto.UpdateRequestEvent;
import org.folio.rspec.service.processor.request.UpdateRequestEventProcessor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@RequiredArgsConstructor
public class EventConsumer {

private final KafkaFolioContextExecutor executor;
private final UpdateRequestEventProcessor updateRequestProcessor;

@KafkaListener(
containerFactory = "updateRequestEventListenerFactory",
topicPattern = "#{folioKafkaProperties.listener['update-requests'].topicPattern}",
groupId = "#{folioKafkaProperties.listener['update-requests'].groupId}",
concurrency = "#{folioKafkaProperties.listener['update-requests'].concurrency}")
public void handleSpecUpdateEvents(UpdateRequestEvent updateRequestEvent, MessageHeaders headers) {
log.info("Received update request [event={}]", updateRequestEvent);
executor.runInContext(headers, () -> updateRequestProcessor.process(updateRequestEvent));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.folio.rspec.integration.kafka;

import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.folio.spring.DefaultFolioExecutionContext;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.FolioModuleMetadata;
import org.folio.spring.integration.XOkapiHeaders;
import org.folio.spring.scope.FolioExecutionContextSetter;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class KafkaFolioContextExecutor {

private final FolioModuleMetadata moduleMetadata;

public void runInContext(MessageHeaders headers, Runnable runnable) {
try (var fec = new FolioExecutionContextSetter(getContextFromMessageHeaders(headers, moduleMetadata))) {
runnable.run();
}
}

public static FolioExecutionContext getContextFromMessageHeaders(MessageHeaders headers,
FolioModuleMetadata moduleMetadata) {
Map<String, Collection<String>> map = new HashMap<>();
map.put(XOkapiHeaders.TENANT, getHeaderValue(headers, XOkapiHeaders.TENANT));
map.put(XOkapiHeaders.URL, getHeaderValue(headers, XOkapiHeaders.URL));
map.put(XOkapiHeaders.TOKEN, getHeaderValue(headers, XOkapiHeaders.TOKEN));
map.put(XOkapiHeaders.USER_ID, getHeaderValue(headers, XOkapiHeaders.USER_ID));
return new DefaultFolioExecutionContext(moduleMetadata, map);
}

private static List<String> getHeaderValue(MessageHeaders headers, String headerName) {
var headerValue = headers.get(headerName);
return headerValue == null
? Collections.emptyList()
: Collections.singletonList(new String((byte[]) headerValue, StandardCharsets.UTF_8));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.folio.rspec.integration.kafka;

import lombok.Getter;

@Getter
public enum KafkaTopicName {

SPECIFICATION_UPDATE("specification-storage.specification.update"),
SPECIFICATION_UPDATED("specification-storage.specification.updated");

private final String topicName;

KafkaTopicName(String topicName) {
this.topicName = topicName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@
import org.springframework.stereotype.Component;

@Component
public class SpecificationChangeProducer extends EventProducer<UUID, SpecificationUpdatedEvent> {
public class SpecificationChangedEventProducer extends EventProducer<UUID, SpecificationUpdatedEvent> {

public static final String SPECIFICATION_UPDATED_TOPIC = "specification-storage.specification.updated";

public SpecificationChangeProducer(
public SpecificationChangedEventProducer(
KafkaTemplate<String, SpecificationUpdatedEvent> template,
FolioExecutionContext context) {
super(template, context);
}

@Override
protected String topicName() {
return SPECIFICATION_UPDATED_TOPIC;
return KafkaTopicName.SPECIFICATION_UPDATED.getTopicName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@ public SubfieldDtoCollection findFieldSubfields(UUID fieldId) {
);
}

public SubfieldDto saveSubfield(UUID specificationId, String fieldTag, SubfieldDto dto) {
log.debug("saveSubfield::dto={}", dto);
return doForFieldOrFail(specificationId, fieldTag,
field -> {
var saved = subfieldService.saveSubfield(field, dto);
eventProducer.sendEvent(field.getSpecification().getId());
return saved;
}
);
}

public SubfieldDto createLocalSubfield(UUID fieldId, SubfieldChangeDto createDto) {
log.debug("createLocalSubfield::fieldId={}, createDto={}", fieldId, createDto);
return doForFieldOrFail(fieldId,
Expand Down Expand Up @@ -162,4 +173,10 @@ private <T> T doForFieldOrFail(UUID fieldId, Function<Field, T> action) {
.map(action)
.orElseThrow(() -> ResourceNotFoundException.forField(fieldId));
}

private <T> T doForFieldOrFail(UUID specificationId, String fieldTag, Function<Field, T> action) {
return fieldRepository.findBySpecificationIdAndTag(specificationId, fieldTag)
.map(action)
.orElseThrow(() -> ResourceNotFoundException.forField(specificationId, fieldTag));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public SubfieldDtoCollection findFieldSubfields(UUID fieldId) {
return new SubfieldDtoCollection().subfields(subfieldDtos).totalRecords(subfieldDtos.size());
}

public SubfieldDto saveSubfield(Field field, SubfieldDto dto) {
log.debug("saveSubfield::fieldId={}, dto={}", field.getId(), dto);
var entity = mapper.toEntity(dto);
entity.setField(field);
var created = repository.save(entity);
return mapper.toDto(created);
}

public SubfieldDto createLocalSubfield(Field field, SubfieldChangeDto createDto) {
log.info("createLocalSubfield::fieldId={}, dto={}", field.getId(), createDto);
var entity = mapper.toEntity(createDto);
Expand Down Expand Up @@ -82,5 +90,4 @@ public void deleteSubfield(UUID id) {
public void setValidators(List<ScopeValidator<SubfieldChangeDto, Subfield>> validators) {
validators.forEach(validator -> this.validators.put(validator.scope(), validator));
}

}
Loading

0 comments on commit 7737c8b

Please sign in to comment.