Skip to content

Commit

Permalink
MODLD-445: Only one instance of the work is displayed in search resul…
Browse files Browse the repository at this point in the history
…ts (#310)
  • Loading branch information
askhat-abishev authored Aug 7, 2024
1 parent a6af195 commit 7134a39
Show file tree
Hide file tree
Showing 8 changed files with 451 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ public class ResourceModificationEventListener {
@TransactionalEventListener
public void afterCreate(ResourceCreatedEvent resourceCreatedEvent) {
log.info("ResourceCreatedEvent received [{}]", resourceCreatedEvent);
var resource = resourceRepository.getReferenceById(resourceCreatedEvent.id());
createMessageSenders.forEach(sender -> sender.produce(resource));
createMessageSenders.forEach(sender -> sender.produce(resourceCreatedEvent.resource()));
}

@TransactionalEventListener
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
package org.folio.linked.data.model.entity.event;

public record ResourceCreatedEvent(Long id) {
import org.folio.linked.data.model.entity.Resource;

public record ResourceCreatedEvent(Resource resource) {
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.folio.linked.data.repo;

import java.util.List;
import org.folio.linked.data.model.entity.ResourceEdge;
import org.folio.linked.data.model.entity.pk.ResourceEdgePk;
import org.springframework.data.jpa.repository.JpaRepository;

public interface ResourceEdgeRepository extends JpaRepository<ResourceEdge, ResourceEdgePk> {

List<ResourceEdge> findByIdSourceHash(Long hash);

List<ResourceEdge> findByIdTargetHash(Long hash);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.ObjectUtils.notEqual;
import static org.folio.ld.dictionary.ResourceTypeDictionary.INSTANCE;
import static org.folio.linked.data.util.BibframeUtils.extractWorkFromInstance;
import static org.folio.linked.data.util.Constants.IS_NOT_FOUND;
import static org.folio.linked.data.util.Constants.RESOURCE_WITH_GIVEN_ID;
import static org.folio.linked.data.util.Constants.RESOURCE_WITH_GIVEN_INVENTORY_ID;
Expand Down Expand Up @@ -69,16 +70,17 @@ public ResourceResponseDto createResource(ResourceRequestDto resourceDto) {
log.info("createResource\n[{}]\nfrom DTO [{}]", mapped, resourceDto);
metadataService.ensure(mapped);
var persisted = saveMergingGraph(mapped);
applicationEventPublisher.publishEvent(new ResourceCreatedEvent(persisted.getId()));
applicationEventPublisher.publishEvent(new ResourceCreatedEvent(persisted));
return resourceDtoMapper.toDto(persisted);
}

@Override
public Long createResource(org.folio.ld.dictionary.model.Resource modelResource) {
var mapped = resourceModelMapper.toEntity(modelResource);
var persisted = saveMergingGraph(mapped);
refreshWork(persisted);
log.info("createResource [{}]\nfrom modelResource [{}]", persisted, modelResource);
applicationEventPublisher.publishEvent(new ResourceCreatedEvent(persisted.getId()));
applicationEventPublisher.publishEvent(new ResourceCreatedEvent(persisted));
return persisted.getId();
}

Expand Down Expand Up @@ -257,4 +259,20 @@ private void breakEdgesAndDelete(Resource resource) {
breakCircularEdges(resource);
resourceRepo.delete(resource);
}

private void refreshWork(Resource resource) {
if (resource.isOfType(INSTANCE)) {
extractWorkFromInstance(resource)
.ifPresent(work -> {
edgeRepo.findByIdTargetHash(work.getId())
.forEach(work::addIncomingEdge);
addOutgoingEdges(work);
});
}
}

private void addOutgoingEdges(Resource resource) {
edgeRepo.findByIdSourceHash(resource.getId())
.forEach(resource::addOutgoingEdge);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.folio.linked.data.integration;

import static org.apache.commons.collections4.CollectionUtils.isNotEmpty;
import static org.assertj.core.api.Assertions.assertThat;
import static org.folio.ld.dictionary.ResourceTypeDictionary.CONCEPT;
import static org.folio.ld.dictionary.ResourceTypeDictionary.INSTANCE;
import static org.folio.ld.dictionary.ResourceTypeDictionary.PERSON;
import static org.folio.linked.data.model.entity.ResourceSource.MARC;
import static org.folio.linked.data.test.TestUtil.FOLIO_TEST_PROFILE;
import static org.folio.linked.data.test.TestUtil.OBJECT_MAPPER;
import static org.folio.linked.data.test.TestUtil.TENANT_ID;
import static org.folio.linked.data.test.TestUtil.awaitAndAssert;
import static org.folio.linked.data.test.TestUtil.defaultKafkaHeaders;
Expand All @@ -21,11 +23,13 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.folio.linked.data.e2e.base.IntegrationTest;
import org.folio.linked.data.integration.kafka.consumer.DataImportEventHandler;
import org.folio.linked.data.mapper.ResourceModelMapper;
import org.folio.linked.data.model.entity.Resource;
import org.folio.linked.data.model.entity.ResourceEdge;
import org.folio.linked.data.repo.ResourceEdgeRepository;
Expand All @@ -34,6 +38,7 @@
import org.folio.linked.data.service.impl.tenant.TenantScopedExecutionService;
import org.folio.linked.data.test.kafka.KafkaSearchAuthorityAuthorityTopicListener;
import org.folio.linked.data.test.kafka.KafkaSearchWorkIndexTopicListener;
import org.folio.marc4ld.service.marc2ld.bib.MarcBib2ldMapper;
import org.folio.search.domain.dto.DataImportEvent;
import org.folio.search.domain.dto.InstanceIngressEvent;
import org.folio.spring.tools.kafka.FolioMessageProducer;
Expand All @@ -51,7 +56,6 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.transaction.annotation.Transactional;

@Transactional
@IntegrationTest
@ActiveProfiles({FOLIO_PROFILE, FOLIO_TEST_PROFILE})
class DataImportEventListenerIT {
Expand Down Expand Up @@ -79,6 +83,10 @@ class DataImportEventListenerIT {
private ResourceService resourceService;
@MockBean
private FolioMessageProducer<InstanceIngressEvent> instanceIngressMessageProducer;
@Autowired
private MarcBib2ldMapper marc2BibframeMapper;
@Autowired
private ResourceModelMapper resourceModelMapper;

@BeforeAll
static void beforeAll(@Autowired KafkaAdminService kafkaAdminService) {
Expand All @@ -102,7 +110,7 @@ public void clean() {
"samples/marc_non_monograph_leader.jsonl, 0",
"samples/marc_monograph_leader.jsonl, 1"
})
public void shouldNotProcessEventForNullableResource(String resource, int interactions) {
void shouldNotProcessEventForNullableResource(String resource, int interactions) {
// given
var marc = loadResourceAsString(resource);
var emittedEvent = instanceCreatedEvent(EVENT_ID_01, TENANT_ID, marc);
Expand All @@ -116,6 +124,7 @@ public void shouldNotProcessEventForNullableResource(String resource, int intera
verify(resourceService, times(interactions)).createResource(any(org.folio.ld.dictionary.model.Resource.class));
}

@Transactional
@Test
void shouldProcessInstanceCreatedEventFromDataImport() {
// given
Expand Down Expand Up @@ -155,6 +164,7 @@ void shouldProcessInstanceCreatedEventFromDataImport() {
verifyNoInteractions(instanceIngressMessageProducer);
}

@Transactional
@Test
void shouldConsumeAuthorityEventFromDataImport() {
// given
Expand Down Expand Up @@ -201,6 +211,39 @@ void shouldConsumeAuthorityEventFromDataImport() {
);
}

@Test
void shouldSendToIndexWorkWithTwoInstances() {
//given
var firstInstanceMarc = loadResourceAsString("samples/full_marc_sample.jsonl");
mapAndSave(firstInstanceMarc);
var secondInstanceMarc = firstInstanceMarc.replace(" 2019493854", " 2019493855")
.replace("code", "another code")
.replace("item number", "another item number");
var emittedEvent = instanceCreatedEvent(EVENT_ID_01, TENANT_ID, secondInstanceMarc);
var expectedMessage = loadResourceAsString("integration/kafka/search/expected_message.json");

//when
eventKafkaTemplate.send(newProducerRecord(emittedEvent));

//then
awaitAndAssert(() -> {
assertTrue(isNotEmpty(kafkaSearchWorkIndexTopicListener.getMessages()));
kafkaSearchWorkIndexTopicListener.getMessages()
.stream()
.findFirst()
.ifPresent(message -> {
try {
assertThat(OBJECT_MAPPER.readValue(message, Object.class))
.usingRecursiveComparison()
.ignoringFields("ts")
.isEqualTo(OBJECT_MAPPER.readValue(expectedMessage, Object.class));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
});
}

private void assertWorkIsIndexed(Resource instance) {
var workIdOptional = instance.getOutgoingEdges()
.stream()
Expand Down Expand Up @@ -237,4 +280,23 @@ private ProducerRecord<String, String> newProducerRecord(String emittedEvent) {
return new ProducerRecord(getTopicName(TENANT_ID, DI_COMPLETED_TOPIC), 0,
EVENT_ID_01, emittedEvent, defaultKafkaHeaders());
}

private void mapAndSave(String marc) {
marc2BibframeMapper.fromMarcJson(marc)
.map(resourceModelMapper::toEntity)
.map(resourceRepo::save)
.map(Resource::getOutgoingEdges)
.stream()
.flatMap(Set::stream)
.forEach(this::saveEdge);
}

private void saveEdge(ResourceEdge resourceEdge) {
resourceEdge.computeId();
var target = resourceEdge.getTarget();
resourceRepo.save(target);
resourceEdgeRepository.save(resourceEdge);
target.getOutgoingEdges()
.forEach(this::saveEdge);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void create_shouldPersistMappedResourceAndPublishResourceCreatedEvent_forResourc
assertThat(response).isEqualTo(expectedResponse);
var resourceCreateEventCaptor = ArgumentCaptor.forClass(ResourceCreatedEvent.class);
verify(applicationEventPublisher).publishEvent(resourceCreateEventCaptor.capture());
assertThat(work.getId()).isEqualTo(resourceCreateEventCaptor.getValue().id());
assertThat(work.getId()).isEqualTo(resourceCreateEventCaptor.getValue().resource().getId());
}

@Test
Expand All @@ -149,7 +149,7 @@ void create_shouldPersistMappedResourceAndPublishResourceCreatedEvent_forResourc
assertThat(response).isEqualTo(expectedResponse);
var resourceCreateEventCaptor = ArgumentCaptor.forClass(ResourceCreatedEvent.class);
verify(applicationEventPublisher).publishEvent(resourceCreateEventCaptor.capture());
assertThat(work.getId()).isEqualTo(resourceCreateEventCaptor.getValue().id());
assertThat(work.getId()).isEqualTo(resourceCreateEventCaptor.getValue().resource().getId());
}

@Test
Expand Down
Loading

0 comments on commit 7134a39

Please sign in to comment.