Skip to content

Commit

Permalink
MODLD-395: working implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
PBobylev committed Jun 17, 2024
1 parent cfbc449 commit 5c8cc83
Show file tree
Hide file tree
Showing 22 changed files with 101 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .run/LinkedDataApplication [Local-Folio].run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<configuration default="false" name="LinkedDataApplication [Local-Folio]" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="org.folio.linked.data.LinkedDataApplication" />
<module name="mod-linked-data" />
<option name="VM_PARAMETERS" value="-Dspring.profiles.active=folio,search,local" />
<option name="VM_PARAMETERS" value="-Dspring.profiles.active=folio,local" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="org.folio.linked.data.*" />
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/folio/linked/data/client/SearchClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.linked.data.client;

import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;

Expand All @@ -12,7 +13,7 @@
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient(name = "search")
@Profile(SEARCH_PROFILE)
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
public interface SearchClient {

@PostMapping(value = "/index/indices", consumes = APPLICATION_JSON_VALUE, produces = APPLICATION_JSON_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@

@Configuration
@RequiredArgsConstructor
@Profile(FOLIO_PROFILE)
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
public class FolioKafkaConfig {
private final KafkaProperties kafkaProperties;

@Bean
@Profile(SEARCH_PROFILE)
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
public ProducerFactory<String, ResourceEvent> searchIndexProducerFactory() {
return new DefaultKafkaProducerFactory<>(getProducerFactoryProperties());
}

@Bean
@Profile(FOLIO_PROFILE)
public ProducerFactory<String, InstanceIngressEvent> instanceIngressProducerFactory() {
return new DefaultKafkaProducerFactory<>(getProducerFactoryProperties());
}
Expand All @@ -45,12 +46,13 @@ private Map<String, Object> getProducerFactoryProperties() {
}

@Bean
@Profile(SEARCH_PROFILE)
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
public KafkaTemplate<String, ResourceEvent> searchTemplate(ProducerFactory<String, ResourceEvent> factory) {
return new KafkaTemplate<>(factory);
}

@Bean
@Profile(FOLIO_PROFILE)
public KafkaTemplate<String, InstanceIngressEvent> iiTemplate(ProducerFactory<String, InstanceIngressEvent> factory) {
return new KafkaTemplate<>(factory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) t

private Collection<String> getExcludeBeans() {
return List.of(
"TenantService",
"defaultTenantController",
"defaultTenantOkapiHeaderValidationFilter"
);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.linked.data.controller;

import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;

import lombok.RequiredArgsConstructor;
Expand All @@ -11,7 +12,7 @@
import org.springframework.web.bind.annotation.RestController;

@Validated
@Profile(SEARCH_PROFILE)
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
@RestController
@RequiredArgsConstructor
public class ReindexingController implements ReindexApi {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package org.folio.linked.data.integration.kafka.sender.inventory;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.spring.tools.config.properties.FolioEnvironment.getFolioEnvName;
import static org.folio.spring.tools.kafka.KafkaUtils.getTenantTopicName;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.folio.linked.data.mapper.kafka.KafkaInventoryMessageMapper;
import org.folio.linked.data.model.entity.Resource;
import org.folio.search.domain.dto.InstanceIngressEvent;
import org.folio.search.domain.dto.InventoryInstanceIngressEventEventMetadata;
import org.folio.spring.FolioExecutionContext;
import org.folio.spring.integration.XOkapiHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.core.KafkaTemplate;
Expand All @@ -35,19 +39,21 @@ public class KafkaInventorySenderImpl implements KafkaInventorySender {
@SneakyThrows
public void sendInstanceCreated(Resource resource) {
var tenant = folioExecutionContext.getTenantId();
var tenantTopicName = getTenantTopicName(initialInventoryInstanceIngressTopicName, tenant);
var tenantTopicName = getTenantTopicName(initialInventoryInstanceIngressTopicName, getFolioEnvName(), tenant);
kafkaInventoryMessageMapper.toInstanceIngressPayload(resource)
.map(p -> {
String id = UUID.randomUUID().toString();
return inventoryKafkaTemplate.send(tenantTopicName, id,
new InstanceIngressEvent()
.id(id)
.eventType(InstanceIngressEvent.EventTypeEnum.CREATE_INSTANCE)
.eventPayload(p)
.eventMetadata(new InventoryInstanceIngressEventEventMetadata()
.tenantId(tenant)
)
);
var id = UUID.randomUUID().toString();
var event = new InstanceIngressEvent()
.id(id)
.eventType(InstanceIngressEvent.EventTypeEnum.CREATE_INSTANCE)
.eventPayload(p)
.eventMetadata(new InventoryInstanceIngressEventEventMetadata()
.tenantId(tenant)
);
var pr = new ProducerRecord<>(tenantTopicName, id, event);
pr.headers().add(XOkapiHeaders.URL, folioExecutionContext.getOkapiUrl().getBytes(UTF_8));
pr.headers().add(XOkapiHeaders.TOKEN, folioExecutionContext.getToken().getBytes(UTF_8));
return inventoryKafkaTemplate.send(pr);
}
).ifPresent(f -> logSending(tenantTopicName, f));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.linked.data.integration.kafka.sender.search;

import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;

import lombok.RequiredArgsConstructor;
Expand All @@ -10,7 +11,7 @@

@Log4j2
@Service
@Profile("!" + SEARCH_PROFILE)
@Profile({"!" + FOLIO_PROFILE + "& !" + SEARCH_PROFILE})
@RequiredArgsConstructor
public class KafkaSearchSenderDummy implements KafkaSearchSender {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.lang.Long.parseLong;
import static org.folio.linked.data.util.BibframeUtils.isSameResource;
import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_RESOURCE_NAME;
import static org.folio.search.domain.dto.ResourceEventType.CREATE;
Expand All @@ -26,15 +27,15 @@

@Log4j2
@Service
@Profile(SEARCH_PROFILE)
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
@RequiredArgsConstructor
public class KafkaSearchSenderImpl implements KafkaSearchSender {

private final KafkaTemplate<String, ResourceEvent> kafkaTemplate;
private final FolioExecutionContext folioExecutionContext;
private final ApplicationEventPublisher eventPublisher;
private final KafkaSearchMessageMapper kafkaSearchMessageMapper;
@Value("${mod-linked-data.kafka.topic.bibframe-index}")
@Value("${mod-linked-data.kafka.topic.search.bibframe-index}")
private String initialBibframeIndexTopicName;

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.util.Objects.isNull;

import java.util.Optional;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.folio.linked.data.mapper.ResourceModelMapper;
Expand All @@ -17,6 +18,7 @@
@RequiredArgsConstructor
public class KafkaInventoryMessageMapperImpl implements KafkaInventoryMessageMapper {

private static final String LINKED_DATA_ID = "linkedDataId";
private final Bibframe2MarcMapper bibframe2MarcMapper;
private final ResourceModelMapper resourceModelMapper;

Expand All @@ -31,9 +33,10 @@ public Optional<InstanceIngressPayload> toInstanceIngressPayload(Resource instan
return Optional.empty();
}
var payload = new InstanceIngressPayload()
.sourceRecordIdentifier(String.valueOf(instance.getId()))
.sourceType(InstanceIngressPayload.SourceTypeEnum.BIBFRAME)
.sourceRecordObject(marcJson);
.sourceRecordIdentifier(UUID.randomUUID().toString())
.sourceType(InstanceIngressPayload.SourceTypeEnum.LINKED_DATA)
.sourceRecordObject(marcJson)
.putAdditionalProperty(LINKED_DATA_ID, instance.getId());
return Optional.of(payload);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.linked.data.service.impl;

import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;

import jakarta.persistence.EntityManager;
Expand All @@ -19,7 +20,7 @@
@Log4j2
@Service
@Transactional(readOnly = true)
@Profile(SEARCH_PROFILE)
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
@RequiredArgsConstructor
public class BatchIndexServiceImpl implements BatchIndexService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static java.lang.Boolean.TRUE;
import static org.folio.ld.dictionary.ResourceTypeDictionary.WORK;
import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;

import java.util.Set;
Expand All @@ -24,7 +25,7 @@
@Log4j2
@Service
@Transactional
@Profile(SEARCH_PROFILE)
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
@RequiredArgsConstructor
public class ReindexServiceImpl implements ReindexService {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.folio.linked.data.service.impl.tenant;

import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;

import java.util.Collection;
import lombok.extern.log4j.Log4j2;
import org.folio.spring.FolioExecutionContext;
Expand All @@ -8,12 +11,14 @@
import org.folio.tenant.domain.dto.TenantAttributes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Log4j2
@Primary
@Service
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
public class LinkedTenantService extends TenantService {

private final Collection<TenantServiceWorker> workers;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package org.folio.linked.data.service.impl.tenant.workers;

import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;

import lombok.RequiredArgsConstructor;
import org.folio.linked.data.service.impl.tenant.TenantServiceWorker;
import org.folio.spring.tools.kafka.KafkaAdminService;
import org.folio.tenant.domain.dto.TenantAttributes;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
@Service
@RequiredArgsConstructor
public class KafkaAdminWorker implements TenantServiceWorker {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.folio.linked.data.service.impl.tenant.workers;

import static org.folio.linked.data.util.Constants.FOLIO_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_PROFILE;
import static org.folio.linked.data.util.Constants.SEARCH_RESOURCE_NAME;

Expand All @@ -14,10 +15,10 @@
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;

@Profile(SEARCH_PROFILE)
@Log4j2
@Service
@RequiredArgsConstructor
@Profile({FOLIO_PROFILE, SEARCH_PROFILE})
public class SearchWorker implements TenantServiceWorker {

private final SearchClient searchClient;
Expand Down
25 changes: 19 additions & 6 deletions src/main/resources/application-folio.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
mod-linked-data:
reindex:
page-size: 1000
kafka:
topic:
search:
bibframe-index: search.bibframe
inventory:
instance-ingress: inventory.instance_ingress

folio:
environment: ${ENV:folio}
okapiUrl: ${okapi.url}
Expand All @@ -17,6 +27,15 @@ folio:
concurrency: ${KAFKA_DATA_IMPORT_INSTANCE_CREATE_CONCURRENCY:1}
topic-pattern: ${KAFKA_DI_INSTANCE_CREATED_TOPIC_PATTERN:(${folio.environment}\.)(.*\.)DI_COMPLETED}
group-id: ${folio.environment}-linked-data-data-import-instances-created-group
retry-interval-ms: ${KAFKA_RETRY_INTERVAL_MS:2000}
retry-delivery-attempts: ${KAFKA_RETRY_DELIVERY_ATTEMPTS:6}
topics:
- name: ${mod-linked-data.kafka.topic.search.bibframe-index:search.bibframe}
numPartitions: ${KAFKA_BIBFRAME_TOPIC_PARTITIONS:3}
replicationFactor: ${KAFKA_BIBFRAME_TOPIC_REPLICATION_FACTOR:}
- name: ${mod-linked-data.kafka.topic.inventory.instance-ingress:inventory.instance_ingress}
numPartitions: ${KAFKA_BIBFRAME_TOPIC_PARTITIONS:3}
replicationFactor: ${KAFKA_BIBFRAME_TOPIC_REPLICATION_FACTOR:}

spring:
kafka:
Expand All @@ -37,9 +56,3 @@ spring:
max.in.flight.requests.per.connection: 5
retries: 5
spring.json.add.type.headers: false

mod-linked-data:
kafka:
topic:
inventory:
instance-ingress: inventory.instance_ingress
6 changes: 4 additions & 2 deletions src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
spring:
datasource:
url: jdbc:postgresql://localhost:5432/okapi_modules
username: folio_admin
password: folio_admin
kafka:
bootstrap-servers: localhost:9092
bootstrap-servers: localhost:29092
cloud:
openfeign:
client:
config:
search:
url: http://localhost:8085/search
url: http://localhost:9130/search

management:
endpoints:
Expand Down
Loading

0 comments on commit 5c8cc83

Please sign in to comment.