Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve catalog fetching #93

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class DataTransferController {

private final Config config;

private final DataTransferEndpoint dataTransferEndpoint;
private final DataTransferObservable dataTransferObservable;
private final TransferInitiator transferInitiator;

Expand All @@ -56,7 +55,7 @@ public class DataTransferController {
* @param config Read config value transfer timeout and
* own URI
* @param webService Register data transfer endpoint.
* @param dataEndpointAuthRequestFilter Creating and passing through custom api
* @param authenticationService Creating and passing through custom api
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
Expand All @@ -69,7 +68,7 @@ public DataTransferController(Monitor monitor, Config config, WebService webServ
authenticationService);

this.dataTransferObservable = new DataTransferObservable(monitor);
this.dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
var dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
webService.registerResource(dataTransferEndpoint);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public Negotiator(ConsumerContractNegotiationManager consumerNegotiationManager,
* aborted by throwing an exception. This exception can be inspected using the
* getCause() method.
*/
StatusResult<ContractNegotiation> negotiate(ContractRequest contractRequest)
throws InterruptedException, ExecutionException {
StatusResult<ContractNegotiation> negotiate(ContractRequest contractRequest) {
var previousAgreements = contractNegotiationStore.queryAgreements(QuerySpec.max());
var relevantAgreements = previousAgreements
.filter(agreement -> agreement.getAssetId().equals(contractRequest.getContractOffer().getAssetId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,16 @@
public class PolicyController {


private final PolicyServiceConfig config;

private final PolicyDefinitionStore policyDefinitionStore;
private final PolicyService policyService;

public PolicyController(Monitor monitor, CatalogService catalogService,
TypeTransformerRegistry typeTransformerRegistry, Config systemConfig) {
this.config = new PolicyServiceConfig(systemConfig);
var config = new PolicyServiceConfig(systemConfig);

this.policyDefinitionStore = new PolicyDefinitionStore(monitor, this.config.getAcceptedPolicyDefinitionsPath());
this.policyService = new PolicyService(catalogService, typeTransformerRegistry, this.config,
this.policyDefinitionStore);
this.policyDefinitionStore = new PolicyDefinitionStore(monitor, config.getAcceptedPolicyDefinitionsPath());
this.policyService = new PolicyService(catalogService, typeTransformerRegistry, config,
this.policyDefinitionStore, monitor);
}

public Dataset getDataset(URL providerUrl, String assetId) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,10 @@
package de.fraunhofer.iosb.client.policy;

import static java.lang.String.format;
import static org.eclipse.edc.jsonld.spi.Namespaces.DCAT_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DCAT_SCHEMA;
import static org.eclipse.edc.jsonld.spi.Namespaces.DCT_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DCT_SCHEMA;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_ACCESS_SERVICE_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCT_FORMAT_ATTRIBUTE;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
import static org.eclipse.edc.spi.query.Criterion.criterion;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.List;
Expand All @@ -43,51 +32,53 @@
import java.util.stream.Stream;

import org.eclipse.edc.catalog.spi.Catalog;
import org.eclipse.edc.catalog.spi.DataService;
import org.eclipse.edc.catalog.spi.Dataset;
import org.eclipse.edc.catalog.spi.Distribution;
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.jsonld.TitaniumJsonLd;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.policy.model.Rule;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;

import de.fraunhofer.iosb.client.exception.AmbiguousOrNullException;
import de.fraunhofer.iosb.client.util.Pair;
import jakarta.json.Json;
import jakarta.json.JsonObject;

/**
* Finds out policy for a given asset id and provider EDC url
*/
class PolicyService {

private static final String CATALOG_RETRIEVAL_FAILURE_MSG = "Catalog by provider %s couldn't be retrieved: %s";
private final CatalogService catalogService;
private final TypeTransformerRegistry transformer;

private final PolicyServiceConfig config;
private final PolicyDefinitionStore policyDefinitionStore;

private final JsonLd jsonLdExpander;

/**
* Class constructor
*
* @param catalogService Fetching the catalog of a provider.
* @param transformer Transform json-ld byte-array catalog to catalog class
*/
public PolicyService(CatalogService catalogService, TypeTransformerRegistry transformer,
PolicyServiceConfig config, PolicyDefinitionStore policyDefinitionStore) {
PolicyServiceConfig config, PolicyDefinitionStore policyDefinitionStore, Monitor monitor) {
this.catalogService = catalogService;
this.transformer = transformer;

this.config = config;
this.policyDefinitionStore = policyDefinitionStore;

this.jsonLdExpander = new TitaniumJsonLd(monitor);

}

Dataset getDatasetForAssetId(URL providerUrl, String assetId) throws InterruptedException {
Expand All @@ -110,33 +101,35 @@ Dataset getDatasetForAssetId(URL providerUrl, String assetId) throws Interrupted
}

if (catalogResponse.failed()) {
throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl,
throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl,
catalogResponse.getFailureMessages()));
}

JsonObject modifiedCatalogJson;
try {
modifiedCatalogJson = modifyCatalogJson(catalogResponse.getContent());
} catch (IOException except) {
throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl, except));
}
var catalogJson = Json.createReader(new ByteArrayInputStream(catalogResponse.getContent()))
.readObject();

var catalog = transformer.transform(modifiedCatalogJson, Catalog.class);
var catalogJsonExpansionResult = jsonLdExpander.expand(catalogJson);

if (catalog.failed()) {
throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl,
catalog.getFailureMessages()));
if (catalogJsonExpansionResult.failed()) {
throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl,
catalogJsonExpansionResult.getFailureMessages()));
}

if (Objects.isNull(catalog.getContent().getDatasets()) || catalog.getContent().getDatasets().size() != 1) {
throw new AmbiguousOrNullException(format("Multiple or no policyDefinitions were found for assetId %s!",
assetId));
var catalogResult = transformer.transform(catalogJsonExpansionResult.getContent(), Catalog.class);

if (catalogResult.failed()) {
throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl,
catalogResult.getFailureMessages()));
}

var dataset = catalog.getContent().getDatasets().get(0);
dataset.getDistributions().remove(0); // Remove distribution added to deserialize catalog
var datasets = catalogResult.getContent().getDatasets();
if (Objects.isNull(datasets) || datasets.size() != 1) {
throw new AmbiguousOrNullException(
format("Multiple or no policyDefinitions were found for assetId %s!",
assetId));
}

return dataset;
return datasets.get(0);
}

Pair<String, Policy> getAcceptablePolicyForAssetId(URL providerUrl, String assetId)
Expand All @@ -156,8 +149,7 @@ Pair<String, Policy> getAcceptablePolicyForAssetId(URL providerUrl, String asset
.orElseThrow();

} else {
throw new EdcException("Could not find any contract policyDefinition matching this connector's accepted " +
"policyDefinitions");
throw new EdcException("Could not find any acceptable policyDefinition");
}

return new Pair.PairBuilder<String, Policy>()
Expand All @@ -166,7 +158,8 @@ Pair<String, Policy> getAcceptablePolicyForAssetId(URL providerUrl, String asset

private boolean matchesOwnPolicyDefinitions(Policy policy) {
return policyDefinitionStore.getPolicyDefinitions().stream().anyMatch(
acceptedPolicyDefinition -> policyDefinitionRulesEquality(acceptedPolicyDefinition.getPolicy(),
acceptedPolicyDefinition -> policyDefinitionRulesEquality(
acceptedPolicyDefinition.getPolicy(),
policy));
}

Expand All @@ -186,56 +179,13 @@ private boolean policyDefinitionRulesEquality(Policy first, Policy second) {
.collect(Collectors.toList());

return firstRules.stream().anyMatch(
firstRule -> secondRules.stream().anyMatch(secondRule -> !ruleEquality(firstRule, secondRule)));
firstRule -> secondRules.stream()
.anyMatch(secondRule -> !ruleEquality(firstRule, secondRule)));
}

private <T extends Rule> boolean ruleEquality(T first, T second) {
return Objects.equals(first.getAction(), second.getAction()) && Objects.equals(first.getConstraints(),
second.getConstraints());
}

/*
* Since EDC api does not return Catalog object directly, resort to another
* solution for now.
*/
private JsonObject modifyCatalogJson(byte[] catalogBytes) throws IOException {

// Bytes to string. Replace "dcat:" etc. by schema. String to bytes again
catalogBytes = new String(catalogBytes)
.replace(DCAT_PREFIX + ":", DCAT_SCHEMA)
.replace(ODRL_PREFIX + ":", ODRL_SCHEMA)
.replace(DCT_PREFIX + ":", DCT_SCHEMA)
.replace(DSPACE_PREFIX + ":", DSPACE_SCHEMA)
.getBytes();

Distribution distribution = Distribution.Builder.newInstance()
.format("JSON")
.dataService(DataService.Builder.newInstance().build())
.build();

JsonNode jsonNode;
var om = new ObjectMapper();

var distributionString = om.writeValueAsString(distribution);
var distributionNode = om.readTree(distributionString
.replace("format", DCT_FORMAT_ATTRIBUTE)
.replace("dataService", DCAT_ACCESS_SERVICE_ATTRIBUTE));

jsonNode = om.readTree(catalogBytes);

if (!jsonNode.has(DCAT_SCHEMA + "dataset") || jsonNode.get(DCAT_SCHEMA + "dataset") == null
|| jsonNode.get(DCAT_SCHEMA + "dataset").isEmpty()) {
throw new EdcException("No dataset provided in catalog.");
}

((ArrayNode) jsonNode
.get(DCAT_SCHEMA + "dataset")
.get(DCAT_SCHEMA + "distribution"))
.add(distributionNode);
catalogBytes = om.writeValueAsBytes(jsonNode);

var jsonReader = Json.createReader(new ByteArrayInputStream(catalogBytes));
return jsonReader.readObject();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@
public class TransferInitiatorTest {

private final TransferProcessManager mockTransferProcessManager = mock(TransferProcessManager.class);
private Config configMock;

private TransferInitiator transferInitiator;
private StatusResult<TransferProcess> mockStatusResult;

@BeforeEach
@SuppressWarnings("unchecked")
void initializeContractOfferService() throws URISyntaxException {
configMock = ConfigFactory.fromMap(Map.of("edc.dsp.callback.address", "http://localhost:4321/dsp",
void initializeContractOfferService() {
var configMock = ConfigFactory.fromMap(Map.of("edc.dsp.callback.address", "http://localhost:4321/dsp",
"web.http.port", "8080", "web.http.path", "/api"));

transferInitiator = new TransferInitiator(configMock, mock(Monitor.class), mockTransferProcessManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
Expand All @@ -59,7 +60,7 @@ public PolicyServiceTest() throws MalformedURLException {
@BeforeEach
void initializeContractOfferService() {
policyService = new PolicyService(mockCatalogService, mockTransformer, mockConfig(),
mock(PolicyDefinitionStore.class));
mock(PolicyDefinitionStore.class), mock(Monitor.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,15 @@ public class AasExtension implements ServiceExtension {
private static final Logger logger = Logger.getInstance();
private final ScheduledExecutorService syncExecutor = new ScheduledThreadPoolExecutor(1);
private AasController aasController;
private ConfigurationController configurationController;

@Override
public void initialize(ServiceExtensionContext context) {
this.configurationController = new ConfigurationController(context.getConfig(SETTINGS_PREFIX));
var configurationController = new ConfigurationController(context.getConfig(SETTINGS_PREFIX));

// Distribute controllers, repository
var selfDescriptionRepository = new SelfDescriptionRepository();
this.aasController = new AasController(okHttpClient);
var endpoint = new Endpoint(selfDescriptionRepository, this.aasController, this.configurationController);
var endpoint = new Endpoint(selfDescriptionRepository, this.aasController, configurationController);

// Initialize/Start synchronizer, start AAS services defined in configuration
initializeSynchronizer(selfDescriptionRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@ public boolean equals(Object obj) {
return false;
}

if (!this.getIdShort().equals(other.getIdShort())) {
return false;
}

return true;
return this.getIdShort().equals(other.getIdShort());
}

}
12 changes: 6 additions & 6 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ java "-Dedc.fs.config=./example/configurations/provider.properties" -jar ./examp

After building the extension as seen above, a docker image can be built with

0. `cd ./example`
1. `docker build -t edc-aas-extension:latest .`
2. `mkdir workdir`
2. Create configuration under `./workdir/config.properties`
3. Add additional files under `./workdir` (Fitting to your paths in config.properties)
4. Run with `docker run -i -v $PWD/workdir:/workdir/ -e EDC_FS_CONFIG=/workdir/config.properties edc-extension4aas:latest`
1. `cd ./example`
2. `docker build -t edc-aas-extension:latest .`
3. `mkdir workdir`
4. Create configuration under `./workdir/config.properties`
5. Add additional files under `./workdir` (Fitting to your paths in config.properties)
6. Run with `docker run -i -v $PWD/workdir:/workdir/ -e EDC_FS_CONFIG=/workdir/config.properties edc-extension4aas:latest`

This docker image can be run individually or **inside a docker-compose file**:

Expand Down