Skip to content

Commit

Permalink
Merge pull request #93 from carlos-schmidt/feat/client-improve-catalo…
Browse files Browse the repository at this point in the history
…g-fetching

Improve catalog fetching
  • Loading branch information
carlos-schmidt authored Feb 24, 2024
2 parents 4ebc253 + a3c39ea commit 84c2924
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class DataTransferController {

private final Config config;

private final DataTransferEndpoint dataTransferEndpoint;
private final DataTransferObservable dataTransferObservable;
private final TransferInitiator transferInitiator;

Expand All @@ -56,7 +55,7 @@ public class DataTransferController {
* @param config Read config value transfer timeout and
* own URI
* @param webService Register data transfer endpoint.
* @param dataEndpointAuthRequestFilter Creating and passing through custom api
* @param authenticationService Creating and passing through custom api
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
Expand All @@ -69,7 +68,7 @@ public DataTransferController(Monitor monitor, Config config, WebService webServ
authenticationService);

this.dataTransferObservable = new DataTransferObservable(monitor);
this.dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
var dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
webService.registerResource(dataTransferEndpoint);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public Negotiator(ConsumerContractNegotiationManager consumerNegotiationManager,
* aborted by throwing an exception. This exception can be inspected using the
* getCause() method.
*/
StatusResult<ContractNegotiation> negotiate(ContractRequest contractRequest)
throws InterruptedException, ExecutionException {
StatusResult<ContractNegotiation> negotiate(ContractRequest contractRequest) {
var previousAgreements = contractNegotiationStore.queryAgreements(QuerySpec.max());
var relevantAgreements = previousAgreements
.filter(agreement -> agreement.getAssetId().equals(contractRequest.getContractOffer().getAssetId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,16 @@
public class PolicyController {


private final PolicyServiceConfig config;

private final PolicyDefinitionStore policyDefinitionStore;
private final PolicyService policyService;

public PolicyController(Monitor monitor, CatalogService catalogService,
TypeTransformerRegistry typeTransformerRegistry, Config systemConfig) {
this.config = new PolicyServiceConfig(systemConfig);
var config = new PolicyServiceConfig(systemConfig);

this.policyDefinitionStore = new PolicyDefinitionStore(monitor, this.config.getAcceptedPolicyDefinitionsPath());
this.policyService = new PolicyService(catalogService, typeTransformerRegistry, this.config,
this.policyDefinitionStore);
this.policyDefinitionStore = new PolicyDefinitionStore(monitor, config.getAcceptedPolicyDefinitionsPath());
this.policyService = new PolicyService(catalogService, typeTransformerRegistry, config,
this.policyDefinitionStore, monitor);
}

public Dataset getDataset(URL providerUrl, String assetId) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,10 @@
package de.fraunhofer.iosb.client.policy;

import static java.lang.String.format;
import static org.eclipse.edc.jsonld.spi.Namespaces.DCAT_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DCAT_SCHEMA;
import static org.eclipse.edc.jsonld.spi.Namespaces.DCT_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DCT_SCHEMA;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_ACCESS_SERVICE_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCT_FORMAT_ATTRIBUTE;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
import static org.eclipse.edc.spi.query.Criterion.criterion;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.List;
Expand All @@ -43,51 +32,53 @@
import java.util.stream.Stream;

import org.eclipse.edc.catalog.spi.Catalog;
import org.eclipse.edc.catalog.spi.DataService;
import org.eclipse.edc.catalog.spi.Dataset;
import org.eclipse.edc.catalog.spi.Distribution;
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.jsonld.TitaniumJsonLd;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.policy.model.Rule;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;

import de.fraunhofer.iosb.client.exception.AmbiguousOrNullException;
import de.fraunhofer.iosb.client.util.Pair;
import jakarta.json.Json;
import jakarta.json.JsonObject;

/**
* Finds out policy for a given asset id and provider EDC url
*/
class PolicyService {

private static final String CATALOG_RETRIEVAL_FAILURE_MSG = "Catalog by provider %s couldn't be retrieved: %s";
private final CatalogService catalogService;
private final TypeTransformerRegistry transformer;

private final PolicyServiceConfig config;
private final PolicyDefinitionStore policyDefinitionStore;

private final JsonLd jsonLdExpander;

/**
* Class constructor
*
* @param catalogService Fetching the catalog of a provider.
* @param transformer Transform json-ld byte-array catalog to catalog class
*/
public PolicyService(CatalogService catalogService, TypeTransformerRegistry transformer,
PolicyServiceConfig config, PolicyDefinitionStore policyDefinitionStore) {
PolicyServiceConfig config, PolicyDefinitionStore policyDefinitionStore, Monitor monitor) {
this.catalogService = catalogService;
this.transformer = transformer;

this.config = config;
this.policyDefinitionStore = policyDefinitionStore;

this.jsonLdExpander = new TitaniumJsonLd(monitor);

}

Dataset getDatasetForAssetId(URL providerUrl, String assetId) throws InterruptedException {
Expand All @@ -110,33 +101,35 @@ Dataset getDatasetForAssetId(URL providerUrl, String assetId) throws Interrupted
}

if (catalogResponse.failed()) {
throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl,
throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl,
catalogResponse.getFailureMessages()));
}

JsonObject modifiedCatalogJson;
try {
modifiedCatalogJson = modifyCatalogJson(catalogResponse.getContent());
} catch (IOException except) {
throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl, except));
}
var catalogJson = Json.createReader(new ByteArrayInputStream(catalogResponse.getContent()))
.readObject();

var catalog = transformer.transform(modifiedCatalogJson, Catalog.class);
var catalogJsonExpansionResult = jsonLdExpander.expand(catalogJson);

if (catalog.failed()) {
throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl,
catalog.getFailureMessages()));
if (catalogJsonExpansionResult.failed()) {
throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl,
catalogJsonExpansionResult.getFailureMessages()));
}

if (Objects.isNull(catalog.getContent().getDatasets()) || catalog.getContent().getDatasets().size() != 1) {
throw new AmbiguousOrNullException(format("Multiple or no policyDefinitions were found for assetId %s!",
assetId));
var catalogResult = transformer.transform(catalogJsonExpansionResult.getContent(), Catalog.class);

if (catalogResult.failed()) {
throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl,
catalogResult.getFailureMessages()));
}

var dataset = catalog.getContent().getDatasets().get(0);
dataset.getDistributions().remove(0); // Remove distribution added to deserialize catalog
var datasets = catalogResult.getContent().getDatasets();
if (Objects.isNull(datasets) || datasets.size() != 1) {
throw new AmbiguousOrNullException(
format("Multiple or no policyDefinitions were found for assetId %s!",
assetId));
}

return dataset;
return datasets.get(0);
}

Pair<String, Policy> getAcceptablePolicyForAssetId(URL providerUrl, String assetId)
Expand All @@ -156,8 +149,7 @@ Pair<String, Policy> getAcceptablePolicyForAssetId(URL providerUrl, String asset
.orElseThrow();

} else {
throw new EdcException("Could not find any contract policyDefinition matching this connector's accepted " +
"policyDefinitions");
throw new EdcException("Could not find any acceptable policyDefinition");
}

return new Pair.PairBuilder<String, Policy>()
Expand All @@ -166,7 +158,8 @@ Pair<String, Policy> getAcceptablePolicyForAssetId(URL providerUrl, String asset

private boolean matchesOwnPolicyDefinitions(Policy policy) {
return policyDefinitionStore.getPolicyDefinitions().stream().anyMatch(
acceptedPolicyDefinition -> policyDefinitionRulesEquality(acceptedPolicyDefinition.getPolicy(),
acceptedPolicyDefinition -> policyDefinitionRulesEquality(
acceptedPolicyDefinition.getPolicy(),
policy));
}

Expand All @@ -186,56 +179,13 @@ private boolean policyDefinitionRulesEquality(Policy first, Policy second) {
.collect(Collectors.toList());

return firstRules.stream().anyMatch(
firstRule -> secondRules.stream().anyMatch(secondRule -> !ruleEquality(firstRule, secondRule)));
firstRule -> secondRules.stream()
.anyMatch(secondRule -> !ruleEquality(firstRule, secondRule)));
}

private <T extends Rule> boolean ruleEquality(T first, T second) {
return Objects.equals(first.getAction(), second.getAction()) && Objects.equals(first.getConstraints(),
second.getConstraints());
}

/*
* Since EDC api does not return Catalog object directly, resort to another
* solution for now.
*/
private JsonObject modifyCatalogJson(byte[] catalogBytes) throws IOException {

// Bytes to string. Replace "dcat:" etc. by schema. String to bytes again
catalogBytes = new String(catalogBytes)
.replace(DCAT_PREFIX + ":", DCAT_SCHEMA)
.replace(ODRL_PREFIX + ":", ODRL_SCHEMA)
.replace(DCT_PREFIX + ":", DCT_SCHEMA)
.replace(DSPACE_PREFIX + ":", DSPACE_SCHEMA)
.getBytes();

Distribution distribution = Distribution.Builder.newInstance()
.format("JSON")
.dataService(DataService.Builder.newInstance().build())
.build();

JsonNode jsonNode;
var om = new ObjectMapper();

var distributionString = om.writeValueAsString(distribution);
var distributionNode = om.readTree(distributionString
.replace("format", DCT_FORMAT_ATTRIBUTE)
.replace("dataService", DCAT_ACCESS_SERVICE_ATTRIBUTE));

jsonNode = om.readTree(catalogBytes);

if (!jsonNode.has(DCAT_SCHEMA + "dataset") || jsonNode.get(DCAT_SCHEMA + "dataset") == null
|| jsonNode.get(DCAT_SCHEMA + "dataset").isEmpty()) {
throw new EdcException("No dataset provided in catalog.");
}

((ArrayNode) jsonNode
.get(DCAT_SCHEMA + "dataset")
.get(DCAT_SCHEMA + "distribution"))
.add(distributionNode);
catalogBytes = om.writeValueAsBytes(jsonNode);

var jsonReader = Json.createReader(new ByteArrayInputStream(catalogBytes));
return jsonReader.readObject();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@
public class TransferInitiatorTest {

private final TransferProcessManager mockTransferProcessManager = mock(TransferProcessManager.class);
private Config configMock;

private TransferInitiator transferInitiator;
private StatusResult<TransferProcess> mockStatusResult;

@BeforeEach
@SuppressWarnings("unchecked")
void initializeContractOfferService() throws URISyntaxException {
configMock = ConfigFactory.fromMap(Map.of("edc.dsp.callback.address", "http://localhost:4321/dsp",
void initializeContractOfferService() {
var configMock = ConfigFactory.fromMap(Map.of("edc.dsp.callback.address", "http://localhost:4321/dsp",
"web.http.port", "8080", "web.http.path", "/api"));

transferInitiator = new TransferInitiator(configMock, mock(Monitor.class), mockTransferProcessManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
Expand All @@ -59,7 +60,7 @@ public PolicyServiceTest() throws MalformedURLException {
@BeforeEach
void initializeContractOfferService() {
policyService = new PolicyService(mockCatalogService, mockTransformer, mockConfig(),
mock(PolicyDefinitionStore.class));
mock(PolicyDefinitionStore.class), mock(Monitor.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,15 @@ public class AasExtension implements ServiceExtension {
private static final Logger logger = Logger.getInstance();
private final ScheduledExecutorService syncExecutor = new ScheduledThreadPoolExecutor(1);
private AasController aasController;
private ConfigurationController configurationController;

@Override
public void initialize(ServiceExtensionContext context) {
this.configurationController = new ConfigurationController(context.getConfig(SETTINGS_PREFIX));
var configurationController = new ConfigurationController(context.getConfig(SETTINGS_PREFIX));

// Distribute controllers, repository
var selfDescriptionRepository = new SelfDescriptionRepository();
this.aasController = new AasController(okHttpClient);
var endpoint = new Endpoint(selfDescriptionRepository, this.aasController, this.configurationController);
var endpoint = new Endpoint(selfDescriptionRepository, this.aasController, configurationController);

// Initialize/Start synchronizer, start AAS services defined in configuration
initializeSynchronizer(selfDescriptionRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ public boolean equals(Object obj) {
return false;
}

if (!this.getIdShort().equals(other.getIdShort())) {
return false;
}

return true;
return this.getIdShort().equals(other.getIdShort());
}

}
12 changes: 6 additions & 6 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ java "-Dedc.fs.config=./example/configurations/provider.properties" -jar ./examp

After building the extension as seen above, a docker image can be built with

0. `cd ./example`
1. `docker build -t edc-aas-extension:latest .`
2. `mkdir workdir`
2. Create configuration under `./workdir/config.properties`
3. Add additional files under `./workdir` (Fitting to your paths in config.properties)
4. Run with `docker run -i -v $PWD/workdir:/workdir/ -e EDC_FS_CONFIG=/workdir/config.properties edc-extension4aas:latest`
1. `cd ./example`
2. `docker build -t edc-aas-extension:latest .`
3. `mkdir workdir`
4. Create configuration under `./workdir/config.properties`
5. Add additional files under `./workdir` (Fitting to your paths in config.properties)
6. Run with `docker run -i -v $PWD/workdir:/workdir/ -e EDC_FS_CONFIG=/workdir/config.properties edc-extension4aas:latest`

This docker image can be run individually or **inside a docker-compose file**:

Expand Down

0 comments on commit 84c2924

Please sign in to comment.