Skip to content

Commit

Permalink
MODLD-131: transform DI event message to LD resource (#92)
Browse files Browse the repository at this point in the history
* MODLD-131: transform DI event message to LD resource

* MODLD-131: fix DataImportEventListenerIT

* MODLD-131: fix DataImportEventListenerIT
  • Loading branch information
PBobylev authored Oct 30, 2023
1 parent 784f434 commit 9253890
Show file tree
Hide file tree
Showing 25 changed files with 495 additions and 91 deletions.
29 changes: 0 additions & 29 deletions .github/workflows/pr-build.yml

This file was deleted.

6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<lombok.mapstruct-binding.version>0.2.0</lombok.mapstruct-binding.version>
<hibernate-types.version>2.21.1</hibernate-types.version>
<lib-linked-data-dictionary.version>1.0.0-SNAPSHOT</lib-linked-data-dictionary.version>
<lib-linked-data-marc2ld.version>1.0.0-SNAPSHOT</lib-linked-data-marc2ld.version>

<!-- Test dependencies versions -->
<testcontainers.version>1.19.1</testcontainers.version>
Expand Down Expand Up @@ -80,6 +81,11 @@
<artifactId>lib-linked-data-dictionary</artifactId>
<version>${lib-linked-data-dictionary.version}</version>
</dependency>
<dependency>
<groupId>org.folio</groupId>
<artifactId>lib-linked-data-marc2ld</artifactId>
<version>${lib-linked-data-marc2ld.version}</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;

@EnableCaching
@EnableAsync
@EnableFeignClients
@SpringBootApplication
@ComponentScan({"org.folio.linked.data", "org.folio.marc2ld"})
public class LinkedDataApplication {


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.folio.search.domain.dto.DataImportEvent;
import org.folio.spring.tools.kafka.FolioKafkaProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -55,10 +53,4 @@ public ConsumerFactory<String, DataImportEvent> dataImportEventConsumerFactory()
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}

@Bean
@Qualifier("dataImportEventProcessor")
// TODO (MODLD-131) - Delete this bean & replace with the actual component that process data import events
public Consumer<DataImportEvent> dataImportEventProcessor() {
return event -> {};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.folio.search.domain.dto.DataImportEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

@Primary
@Configuration
public class ObjectMapperConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;

import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.linked.data.integration.consumer.DataImportEventHandler;
import org.folio.search.domain.dto.DataImportEvent;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
Expand All @@ -18,9 +17,7 @@
public class KafkaMessageListener {

private static final String DI_INSTANCE_CREATED_LISTENER = "mod-linked-data-data-import-instance-created-listener";

@Qualifier("dataImportEventProcessor")
private final Consumer<DataImportEvent> eventProcessor;
private final DataImportEventHandler dataImportEventHandler;

@KafkaListener(
id = DI_INSTANCE_CREATED_LISTENER,
Expand All @@ -30,6 +27,6 @@ public class KafkaMessageListener {
topicPattern = "#{folioKafkaProperties.listener['data-import-instance-create'].topicPattern}")
public void handleDataImportInstanceCreatedEvent(DataImportEvent event) {
log.info("Received event: {}", event);
eventProcessor.accept(event);
dataImportEventHandler.handle(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.folio.linked.data.integration.consumer;

import static org.apache.commons.lang3.ObjectUtils.isNotEmpty;
import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.linked.data.service.ResourceService;
import org.folio.marc2ld.mapper.Marc2BibframeMapper;
import org.folio.search.domain.dto.DataImportEvent;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Log4j2
@Component
@Profile(FOLIO_PROFILE)
@RequiredArgsConstructor
public class DataImportEventHandler {

private final Marc2BibframeMapper marc2BibframeMapper;
private final ResourceService resourceService;

public void handle(DataImportEvent event) {
if (isNotEmpty(event.getMarc())) {
var marc2ldResource = marc2BibframeMapper.map(event.getMarc());
Long id = resourceService.createResource(marc2ldResource);
log.info("DataImportEvent with id [{}] was saved as LD resource with id [{}]", event.getId(), id);
} else {
log.error("DataImportEvent with id [{}], tenant [{}], eventType [{}] has no Marc record inside",
event.getId(), event.getTenant(), event.getEventType());
}
}
}
32 changes: 32 additions & 0 deletions src/main/java/org/folio/linked/data/mapper/ResourceMapper.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.linked.data.mapper;

import static java.util.stream.Collectors.toSet;
import static org.folio.ld.dictionary.ResourceTypeDictionary.INSTANCE;
import static org.mapstruct.MappingConstants.ComponentModel.SPRING;

Expand All @@ -8,6 +9,8 @@
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.folio.ld.dictionary.PredicateDictionary;
import org.folio.ld.dictionary.ResourceTypeDictionary;
import org.folio.linked.data.domain.dto.InstanceField;
import org.folio.linked.data.domain.dto.ResourceDto;
import org.folio.linked.data.domain.dto.ResourceField;
Expand All @@ -18,7 +21,11 @@
import org.folio.linked.data.mapper.resource.common.inner.InnerResourceMapper;
import org.folio.linked.data.mapper.resource.kafka.KafkaMessageMapper;
import org.folio.linked.data.model.ResourceShortInfo;
import org.folio.linked.data.model.entity.PredicateEntity;
import org.folio.linked.data.model.entity.Resource;
import org.folio.linked.data.model.entity.ResourceEdge;
import org.folio.linked.data.model.entity.ResourceTypeEntity;
import org.folio.linked.data.model.entity.pk.ResourceEdgePk;
import org.folio.search.domain.dto.BibframeIndex;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
Expand Down Expand Up @@ -46,6 +53,31 @@ public abstract class ResourceMapper {

public abstract ResourceShortInfoPage map(Page<ResourceShort> page);

public Resource toEntity(org.folio.marc2ld.model.Resource marc2ldResource) {
Resource resource = new Resource();
resource.setLabel(marc2ldResource.getLabel());
resource.setDoc(marc2ldResource.getDoc());
resource.setResourceHash(marc2ldResource.getResourceHash());
resource.setTypes(marc2ldResource.getTypes().stream().map(this::toEntity).collect(toSet()));
resource.setOutgoingEdges(marc2ldResource.getOutgoingEdges().stream().map(marc2ldEdge -> toEntity(marc2ldEdge,
resource)).collect(toSet()));
return resource;
}

public abstract ResourceTypeEntity toEntity(ResourceTypeDictionary dictionary);

public ResourceEdge toEntity(org.folio.marc2ld.model.ResourceEdge marc2ldEdge, Resource source) {
var edge = new ResourceEdge();
edge.setId(new ResourceEdgePk(marc2ldEdge.getSource().getResourceHash(),
marc2ldEdge.getTarget().getResourceHash(), marc2ldEdge.getPredicate().getHash()));
edge.setSource(source);
edge.setTarget(toEntity(marc2ldEdge.getTarget()));
edge.setPredicate(toEntity(marc2ldEdge.getPredicate()));
return edge;
}

public abstract PredicateEntity toEntity(PredicateDictionary dictionary);

@SneakyThrows
public Resource toEntity(ResourceDto dto) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import jakarta.persistence.Embeddable;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;

@Data
@Embeddable
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class ResourceEdgePk implements Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public interface ResourceService {

ResourceDto createResource(ResourceDto resourceRequest);

Long createResource(org.folio.marc2ld.model.Resource resource);

ResourceDto getResourceById(Long id);

ResourceDto updateResource(Long id, ResourceDto bibframeRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ public ResourceDto createResource(ResourceDto resourceDto) {
return resourceMapper.toDto(persisted);
}

public Long createResource(org.folio.marc2ld.model.Resource marc2ldResource) {
var mapped = resourceMapper.toEntity(marc2ldResource);
var persisted = resourceRepo.save(mapped);
kafkaSender.sendResourceCreated(resourceMapper.mapToIndex(persisted));
return persisted.getResourceHash();
}

@Override
@Transactional(readOnly = true)
public ResourceDto getResourceById(Long id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,15 @@
</update>
</changeSet>


<changeSet id="12_add_Identifier_type" author="[email protected]">
<preConditions onFail="MARK_RAN">
<sqlCheck expectedResult="0">
select count(*) from type_lookup tl where tl.type_uri = 'http://bibfra.me/vocab/lite/Identifier'
</sqlCheck>
</preConditions>
<insert tableName="type_lookup">
<column name="type_uri" value="http://bibfra.me/vocab/lite/Identifier"/>
<column name="type_hash" valueComputed="hashing.hash_text('http://bibfra.me/vocab/lite/Identifier')"/>
</insert>
</changeSet>
</databaseChangeLog>
5 changes: 4 additions & 1 deletion src/test/java/org/folio/linked/data/e2e/DatabaseFolioIT.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.folio.linked.data.e2e;

import static org.folio.linked.data.test.TestUtil.FOLIO_TEST_PROFILE;
import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;

import org.springframework.test.context.ActiveProfiles;

@ActiveProfiles({"folio", "test-folio"})
@ActiveProfiles({FOLIO_PROFILE, FOLIO_TEST_PROFILE})
class DatabaseFolioIT extends DatabaseIT {
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.folio.linked.data.e2e;

import static java.util.Objects.nonNull;
import static org.folio.linked.data.test.TestUtil.FOLIO_TEST_PROFILE;
import static org.folio.linked.data.test.TestUtil.TENANT_ID;
import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -9,20 +13,29 @@
import org.folio.linked.data.model.entity.Resource;
import org.folio.linked.data.test.kafka.KafkaSearchIndexTopicListener;
import org.folio.search.domain.dto.ResourceEventType;
import org.folio.spring.tools.kafka.KafkaAdminService;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ActiveProfiles;

@ActiveProfiles({"folio", "test-folio", "search"})
public class ReIndexControllerITFolio extends ReIndexControllerIT {
@ActiveProfiles({FOLIO_PROFILE, FOLIO_TEST_PROFILE, SEARCH_PROFILE})
@Disabled("To be enabled in part 3 task")
public class ReIndexControllerFolioIT extends ReIndexControllerIT {

@Autowired
private KafkaSearchIndexTopicListener consumer;

@BeforeAll
static void beforeAll(@Autowired KafkaAdminService kafkaAdminService) {
kafkaAdminService.createTopics(TENANT_ID);
}

@Override
@SneakyThrows
protected void checkKafkaMessageSent(Resource persisted) {
boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS);
boolean messageConsumed = consumer.getLatch().await(30, TimeUnit.SECONDS);
assertTrue(messageConsumed);
if (nonNull(persisted)) {
MatcherAssert.assertThat(consumer.getPayload(), containsString(persisted.getResourceHash().toString()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.folio.linked.data.e2e;

import static org.folio.linked.data.test.TestUtil.TENANT_ID;
import static org.folio.linked.data.test.TestUtil.defaultHeaders;
import static org.folio.linked.data.test.TestUtil.getBibframeSample;
import static org.folio.linked.data.test.TestUtil.getBibframeSampleTest;
Expand All @@ -19,9 +18,7 @@
import org.folio.linked.data.repo.ResourceRepository;
import org.folio.linked.data.test.ResourceEdgeRepository;
import org.folio.spring.test.extension.impl.OkapiConfiguration;
import org.folio.spring.tools.kafka.KafkaAdminService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
Expand Down Expand Up @@ -49,11 +46,6 @@ class ReIndexControllerIT {
@Autowired
private Environment env;

@BeforeAll
static void beforeAll(@Autowired KafkaAdminService kafkaAdminService) {
kafkaAdminService.createTopics(TENANT_ID);
}

@AfterEach
public void clean() {
resourceEdgeRepository.deleteAll();
Expand Down
Loading

0 comments on commit 9253890

Please sign in to comment.