Skip to content

Commit

Permalink
Merge pull request #1 from gbv/dfi_improvements
Browse files Browse the repository at this point in the history
Current (unfinished)state of the importer
  • Loading branch information
sebhofmann authored Sep 16, 2024
2 parents 0108a94 + c3782a2 commit a4a56c7
Show file tree
Hide file tree
Showing 34 changed files with 1,774 additions and 230 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
- name: Upload logs on build failure
if: failure()
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: test-results
path: |
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ It will use the unapi interface to get the data for each ppn.
#### SRU

```properties
importer.sru-sources.gvk.query-pattern=importer.sru-sources.gvk.query-pattern=pica.aed={date} and ((pica.sge="615" and (pica.exk="dfi aktuell" or pica.exk="afa" or pica.exk="dfi compact" or pica.exk="tondokument" or pica.exk="video" or pica.exk="abschlussarbeit" or pica.exk="Zeitschriftenaufsatz" or pica.exk="Sicherheitskopie" or pica.exk="GFfK" or pica.exk="PA-Volltext")) or (pica.sge="lg 3" and (pica.exk="Karikatur" or pica.exk="PA-Volltext" or pica.exk="Presseartikel" or pica.exk="Sicherheitskopie")))
importer.sru-sources.gvk.query-pattern=pica.aed={date} and ((pica.sge="615" and (pica.exk="dfi aktuell" or pica.exk="afa" or pica.exk="dfi compact" or pica.exk="tondokument" or pica.exk="video" or pica.exk="abschlussarbeit" or pica.exk="Zeitschriftenaufsatz" or pica.exk="Sicherheitskopie" or pica.exk="GFfK" or pica.exk="PA-Volltext")) or (pica.sge="lg 3" and (pica.exk="Karikatur" or pica.exk="PA-Volltext" or pica.exk="Presseartikel" or pica.exk="Sicherheitskopie")))
importer.sru-sources.gvk.url=https://sru.k10plus.de/gvk
importer.sru-sources.gvk.date-overwrite=2024-01-01
importer.sru-sources.gvk.oldest-date=2020-01-01
importer.sru-sources.gvk.oldest-date=2024-05-24
importer.sru-sources.gvk.newest-date=2024-05-31
```

The importer looks what is the newest record in the database and uses this date as the date parameter for the query.
Expand Down
159 changes: 152 additions & 7 deletions src/main/java/de/vzg/oai_importer/JobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import javax.xml.transform.TransformerException;

import org.mycore.oai.pmh.OAIException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;

import de.vzg.oai_importer.foreign.Configuration;
import de.vzg.oai_importer.foreign.jpa.ForeignEntity;
import de.vzg.oai_importer.foreign.jpa.ForeignEntityRepository;
import de.vzg.oai_importer.importer.FileBased;
import de.vzg.oai_importer.importer.Importer;
import de.vzg.oai_importer.mapping.jpa.Mapping;
import de.vzg.oai_importer.mycore.MyCoReSynchronizeService;
Expand Down Expand Up @@ -48,7 +53,6 @@ public class JobService {
@Autowired
ForeignEntityRepository repo;


public Page<ForeignEntity> listImportableRecords(String jobID, Pageable pageable) {
ImportJobConfiguration jobConfig = configuration.getJobs().get(jobID);
String targetConfigId = jobConfig.getTargetConfigId();
Expand Down Expand Up @@ -81,12 +85,13 @@ public Map<ForeignEntity, List<Mapping>> testMapping(String jobID, boolean updat
Configuration source = configuration.getCombinedConfig().get(sourceConfigId);

final List<ForeignEntity> foreignEntities = new ArrayList<>();
if(updatable) {
if (updatable) {
Page<ImporterService.Pair<ForeignEntity, MyCoReObjectInfo>> toUpdate = importerService
.detectUpdateableEntities(sourceConfigId, source, target.getUrl(), Pageable.unpaged());
.detectUpdateableEntities(sourceConfigId, source, target.getUrl(), Pageable.unpaged());
toUpdate.forEach(pair -> foreignEntities.add(pair.first()));
} else {
Page<ForeignEntity> toImport = importerService.detectImportableEntities(sourceConfigId, source, target.getUrl(), Pageable.unpaged());
Page<ForeignEntity> toImport
= importerService.detectImportableEntities(sourceConfigId, source, target.getUrl(), Pageable.unpaged());
toImport.forEach(foreignEntities::add);
}

Expand All @@ -110,8 +115,7 @@ public void runJob(String name) throws OAIException, IOException, URISyntaxExcep

MyCoReTargetConfiguration target = configuration.getTargets().get(targetConfigId);


Configuration source = configuration.getCombinedConfig().get(sourceConfigId);
Configuration source = configuration.getCombinedConfig().get(sourceConfigId);

Page<ForeignEntity> records = importerService
.detectImportableEntities(sourceConfigId, source, target.getUrl(), Pageable.unpaged());
Expand Down Expand Up @@ -143,6 +147,141 @@ public void runJob(String name) throws OAIException, IOException, URISyntaxExcep
});
}

public Page<Map.Entry<ForeignEntity, List<String>>> listImportableFiles(String name, Pageable pageable){
ImportJobConfiguration jobConfig = configuration.getJobs().get(name);
String targetConfigId = jobConfig.getTargetConfigId();
String sourceConfigId = jobConfig.getSourceConfigId();

MyCoReTargetConfiguration target = configuration.getTargets().get(targetConfigId);

Configuration source = configuration.getCombinedConfig().get(sourceConfigId);

Page<ImporterService.Pair<ForeignEntity, MyCoReObjectInfo>> records
= importerService.detectUpdateableEntities(sourceConfigId, source, target.getUrl(), pageable);

Importer importer = context.getBean(jobConfig.getImporter(), Importer.class);
importer.setConfig(jobConfig.getImporterConfig());
log.info("Checking if files are updatable for {} records", records.getTotalElements());

if (importer instanceof FileBased e) {
return records.map(pair -> {
ForeignEntity record = pair.first();
MyCoReObjectInfo myCoReObjectInfo = pair.second();

log.info("Checking record {} and mycore object {}", record.getForeignId(),
myCoReObjectInfo.getMycoreId());

try {
return Map.entry(record, e.listImportableFiles(target, record));
} catch (IOException | URISyntaxException ex) {
throw new RuntimeException(ex);
}
});
}
return Page.empty();
}

public Map.Entry<ForeignEntity, List<String>> runJobFileCheckFor(String name, String foreignID)
throws IOException, URISyntaxException {
ImportJobConfiguration jobConfig = configuration.getJobs().get(name);
String targetConfigId = jobConfig.getTargetConfigId();
String sourceConfigId = jobConfig.getSourceConfigId();

MyCoReTargetConfiguration target = configuration.getTargets().get(targetConfigId);

Configuration source = configuration.getCombinedConfig().get(sourceConfigId);

MyCoReObjectInfo myCoReObjectInfo = mycoreRepo.findFirstByRepositoryAndImportURLAndImportID(target.getUrl(),
jobConfig.getSourceConfigId(), foreignID);

ForeignEntity foreign = repo.findFirstByConfigIdAndForeignId(sourceConfigId, foreignID);

Importer importer = context.getBean(jobConfig.getImporter(), Importer.class);
importer.setConfig(jobConfig.getImporterConfig());
if (importer instanceof FileBased e) {
e.listMissingFiles(target, myCoReObjectInfo, foreign);
return Map.entry(foreign, e.listMissingFiles(target, myCoReObjectInfo, foreign));
}
return null;
}

public Page<Map.Entry<ForeignEntity, List<String>>> runJobFileCheck(String name, Pageable pageable) {
ImportJobConfiguration jobConfig = configuration.getJobs().get(name);
String targetConfigId = jobConfig.getTargetConfigId();
String sourceConfigId = jobConfig.getSourceConfigId();

MyCoReTargetConfiguration target = configuration.getTargets().get(targetConfigId);

Configuration source = configuration.getCombinedConfig().get(sourceConfigId);

Page<ImporterService.Pair<ForeignEntity, MyCoReObjectInfo>> records
= importerService.detectUpdateableEntities(sourceConfigId, source, target.getUrl(), pageable);

Importer importer = context.getBean(jobConfig.getImporter(), Importer.class);
importer.setConfig(jobConfig.getImporterConfig());
log.info("Checking if files are updatable for {} records", records.getTotalElements());

if (importer instanceof FileBased e) {
return new PageImpl<>(records.stream().parallel().map(pair -> {
ForeignEntity record = pair.first();
MyCoReObjectInfo myCoReObjectInfo = pair.second();

log.info("Checking record {} and mycore object {}", record.getForeignId(),
myCoReObjectInfo.getMycoreId());

List<String> missingFiles = null;
try {
missingFiles = e.listMissingFiles(target, myCoReObjectInfo, record);
} catch (IOException | URISyntaxException ex) {
throw new RuntimeException(ex);
}
return Map.entry(record, missingFiles);
}).collect(Collectors.toList()));
} else {
log.info("Importer {} is not a FileBased importer", jobConfig.getImporter());
}

return Page.empty();
}

public Page<Map.Entry<ForeignEntity, List<String>>> runJobFileImport(String name, Pageable pageable) {
ImportJobConfiguration jobConfig = configuration.getJobs().get(name);
String targetConfigId = jobConfig.getTargetConfigId();
String sourceConfigId = jobConfig.getSourceConfigId();

MyCoReTargetConfiguration target = configuration.getTargets().get(targetConfigId);

Configuration source = configuration.getCombinedConfig().get(sourceConfigId);

Page<ImporterService.Pair<ForeignEntity, MyCoReObjectInfo>> records
= importerService.detectUpdateableEntities(sourceConfigId, source, target.getUrl(), pageable);

Importer importer = context.getBean(jobConfig.getImporter(), Importer.class);
importer.setConfig(jobConfig.getImporterConfig());
log.info("Checking if files are updatable for {} records", records.getTotalElements());

if (importer instanceof FileBased e) {
return records.map(pair -> {
ForeignEntity record = pair.first();
MyCoReObjectInfo myCoReObjectInfo = pair.second();

log.info("Checking record {} and mycore object {}", record.getForeignId(),
myCoReObjectInfo.getMycoreId());

try {
List<String> fixedFiles = e.fixMissingFiles(target, myCoReObjectInfo, record);
return Map.entry(record, fixedFiles);
} catch (IOException | URISyntaxException ex) {
throw new RuntimeException(ex);
}
});
} else {
log.info("Importer {} is not a FileBased importer", jobConfig.getImporter());
}

return Page.empty();
}

public void runUpdateJob(String name) {
ImportJobConfiguration jobConfig = configuration.getJobs().get(name);
String targetConfigId = jobConfig.getTargetConfigId();
Expand Down Expand Up @@ -174,7 +313,7 @@ public void runUpdateJob(String name) {
log.info("Records with errors: {}", errorRecords);
}

public void importSingleDocument(String jobID, String recordID) {
public void importSingleDocument(String jobID, String recordID) throws IOException, URISyntaxException, TransformerException {
ImportJobConfiguration jobConfig = configuration.getJobs().get(jobID);
String sourceConfigId = jobConfig.getSourceConfigId();

Expand All @@ -184,6 +323,12 @@ public void importSingleDocument(String jobID, String recordID) {
Importer importer = context.getBean(jobConfig.getImporter(), Importer.class);
importer.setConfig(jobConfig.getImporterConfig());
importer.importRecord(target, testRecord);

try {
myCoReSynchronizeService.synchronize(target);
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}

public void updateSingleDocument(String jobID, String recordID) {
Expand Down
105 changes: 105 additions & 0 deletions src/main/java/de/vzg/oai_importer/OaiImporterCLIApplication.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package de.vzg.oai_importer;

import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jdom2.JDOMException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand All @@ -14,13 +18,16 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;

import de.vzg.oai_importer.foreign.Configuration;
import de.vzg.oai_importer.foreign.Harvester;
import de.vzg.oai_importer.foreign.jpa.ForeignEntity;
import de.vzg.oai_importer.importer.PPNLIST2MyCoReImporter;
import de.vzg.oai_importer.mycore.MyCoReSynchronizeService;
import de.vzg.oai_importer.mycore.MyCoReTargetConfiguration;
import de.vzg.oai_importer.mycore.jpa.MyCoReObjectInfo;
Expand All @@ -39,6 +46,8 @@ public class OaiImporterCLIApplication {
@Autowired
MyCoReSynchronizeService myCoReSynchronizeService;
@Autowired
ApplicationContext context;
@Autowired
private ImporterConfiguration configuration;
@Autowired
private JobService jobService;
Expand Down Expand Up @@ -71,9 +80,12 @@ public void updateSource(@ShellOption() String job, @ShellOption(defaultValue =
= (Harvester<Configuration>) applicationContext.getBean(source.getHarvester());
List<ForeignEntity> updatedRecords = bean.update(sourceConfigId, source, onlyMissing);
updatedRecords.forEach(record -> LOGGER.info("Updated record {}", record.getForeignId()));
} catch (RuntimeException e) {
LOGGER.error("Error while updating source of job {}", job, e.getCause());
} catch (Exception e) {
LOGGER.error("Error while updating source of job {}", job, e);
}

}

@ShellMethod(key = "update-target", value = "Updates the target of a job")
Expand Down Expand Up @@ -117,6 +129,99 @@ public void runImporter(@ShellOption() String job) {
}
}

@ShellMethod(key = "run-importer-file-check", value = "Runs the importer file check")
public void runImporterFileCheck(@ShellOption() String job) {
if (checkJobPresent(job)) {
return;
}

LOGGER.info("Running job {}", job);
try {

jobService.runJobFileCheck(job, Pageable.unpaged())
.stream().filter(record -> !record.getValue().isEmpty())
.forEach(record -> LOGGER.info("Record {} is missing {} files", record.getKey().getForeignId(),
record.getValue().stream().collect(Collectors.joining(", "))));
} catch (Exception e) {
LOGGER.error("Error while running job {}", job, e);
}
}

@ShellMethod(key = "run-importer-file-check-for-record", value = "Runs the importer file check for a record")
public void runImporterFileCheckForRecord(@ShellOption() String job, @ShellOption() String recordId) {
if (checkJobPresent(job)) {
return;
}

LOGGER.info("Running job {} for record {}", job, recordId);
try {
Map.Entry<ForeignEntity, List<String>> foreignEntityListEntry
= jobService.runJobFileCheckFor(job, recordId);
if (foreignEntityListEntry != null) {
LOGGER.info("Record {} is missing {} files", foreignEntityListEntry.getKey().getForeignId(),
foreignEntityListEntry.getValue().size());
} else {
LOGGER.info("Record {} not found", recordId);
}
} catch (Exception e) {
LOGGER.error("Error while running job {}", job, e);
}
}

@ShellMethod(key = "run-importer-file-import", value = "Runs the importer which imports files to existing records")
public void runImporterFileImport(@ShellOption() String job) {
if (checkJobPresent(job)) {
return;
}

LOGGER.info("Running job {}", job);
try {
jobService.runJobFileImport(job, Pageable.unpaged()).stream().filter(record -> !record.getValue().isEmpty())
.forEach(record -> {
LOGGER.info("Added missing files to {}:{}", record.getKey().getForeignId(),
record.getValue().stream().collect(Collectors.joining(", ")));
});
} catch (Exception e) {
LOGGER.error("Error while running job {}", job, e);
}
}

@ShellMethod(key = "check-never-should-have-been-imported",
value = "Checks if records have been imported that should not have been imported")
public void checkNeverShouldHaveBeenImported(@ShellOption() String job) {
if (checkJobPresent(job)) {
return;
}
ImportJobConfiguration importJobConfiguration = jobService.configuration.getJobs().get(job);
PPNLIST2MyCoReImporter importer
= context.getBean(importJobConfiguration.getImporter(), PPNLIST2MyCoReImporter.class);
importer.setConfig(importJobConfiguration.getImporterConfig());

LOGGER.info("Running job {}", job);
Pageable pageable = Pageable.ofSize(10000);
Page<ImporterService.Pair<ForeignEntity, MyCoReObjectInfo>> pairs;
try {
do {
Date date = new Date();
pairs = jobService.listUpdateableRecords(job, pageable);
LOGGER.info("Retrieved {} records in {}", pairs.getNumberOfElements(), new Date().getTime() - date.getTime());
pairs.stream().filter(pair -> {
try {
return importer.shouldNotBeImported(pair.first(), pair.second());
} catch (IOException | JDOMException e) {
return true;
}
}).forEach(pair -> {
LOGGER.info("Record {} with id {} should not have been imported", pair.first().getForeignId(),
pair.second().getMycoreId());
});
pageable = pageable.next();
} while(pairs.hasNext());
} catch (Exception e) {
LOGGER.error("Error while running job {}", job, e);
}
}

@ShellMethod(key = "run-update", value = "Runs the update")
public void runUpdate(@ShellOption() String job) {
if (checkJobPresent(job)) {
Expand Down
Loading

0 comments on commit a4a56c7

Please sign in to comment.