diff --git a/client/src/main/java/de/fraunhofer/iosb/client/dataTransfer/DataTransferController.java b/client/src/main/java/de/fraunhofer/iosb/client/dataTransfer/DataTransferController.java index 16dc896b..9592647d 100644 --- a/client/src/main/java/de/fraunhofer/iosb/client/dataTransfer/DataTransferController.java +++ b/client/src/main/java/de/fraunhofer/iosb/client/dataTransfer/DataTransferController.java @@ -43,7 +43,6 @@ public class DataTransferController { private final Config config; - private final DataTransferEndpoint dataTransferEndpoint; private final DataTransferObservable dataTransferObservable; private final TransferInitiator transferInitiator; @@ -56,7 +55,7 @@ public class DataTransferController { * @param config Read config value transfer timeout and * own URI * @param webService Register data transfer endpoint. - * @param dataEndpointAuthRequestFilter Creating and passing through custom api + * @param authenticationService Creating and passing through custom api * keys for each data transfer. * @param transferProcessManager Initiating a transfer process as a * consumer. @@ -69,7 +68,7 @@ public DataTransferController(Monitor monitor, Config config, WebService webServ authenticationService); this.dataTransferObservable = new DataTransferObservable(monitor); - this.dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable); + var dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable); webService.registerResource(dataTransferEndpoint); } diff --git a/client/src/main/java/de/fraunhofer/iosb/client/negotiation/Negotiator.java b/client/src/main/java/de/fraunhofer/iosb/client/negotiation/Negotiator.java index ae2018a2..fbf28e3c 100644 --- a/client/src/main/java/de/fraunhofer/iosb/client/negotiation/Negotiator.java +++ b/client/src/main/java/de/fraunhofer/iosb/client/negotiation/Negotiator.java @@ -56,8 +56,7 @@ public Negotiator(ConsumerContractNegotiationManager consumerNegotiationManager, * aborted by throwing an exception. This exception can be inspected using the * getCause() method. */ - StatusResult negotiate(ContractRequest contractRequest) - throws InterruptedException, ExecutionException { + StatusResult negotiate(ContractRequest contractRequest) { var previousAgreements = contractNegotiationStore.queryAgreements(QuerySpec.max()); var relevantAgreements = previousAgreements .filter(agreement -> agreement.getAssetId().equals(contractRequest.getContractOffer().getAssetId())) diff --git a/client/src/main/java/de/fraunhofer/iosb/client/policy/PolicyController.java b/client/src/main/java/de/fraunhofer/iosb/client/policy/PolicyController.java index 66608ba8..dccf0cd1 100644 --- a/client/src/main/java/de/fraunhofer/iosb/client/policy/PolicyController.java +++ b/client/src/main/java/de/fraunhofer/iosb/client/policy/PolicyController.java @@ -36,18 +36,16 @@ public class PolicyController { - private final PolicyServiceConfig config; - private final PolicyDefinitionStore policyDefinitionStore; private final PolicyService policyService; public PolicyController(Monitor monitor, CatalogService catalogService, TypeTransformerRegistry typeTransformerRegistry, Config systemConfig) { - this.config = new PolicyServiceConfig(systemConfig); + var config = new PolicyServiceConfig(systemConfig); - this.policyDefinitionStore = new PolicyDefinitionStore(monitor, this.config.getAcceptedPolicyDefinitionsPath()); - this.policyService = new PolicyService(catalogService, typeTransformerRegistry, this.config, - this.policyDefinitionStore); + this.policyDefinitionStore = new PolicyDefinitionStore(monitor, config.getAcceptedPolicyDefinitionsPath()); + this.policyService = new PolicyService(catalogService, typeTransformerRegistry, config, + this.policyDefinitionStore, monitor); } public Dataset getDataset(URL providerUrl, String assetId) throws InterruptedException { diff --git a/client/src/main/java/de/fraunhofer/iosb/client/policy/PolicyService.java b/client/src/main/java/de/fraunhofer/iosb/client/policy/PolicyService.java index 9cf0049a..6064bbbf 100644 --- a/client/src/main/java/de/fraunhofer/iosb/client/policy/PolicyService.java +++ b/client/src/main/java/de/fraunhofer/iosb/client/policy/PolicyService.java @@ -16,21 +16,10 @@ package de.fraunhofer.iosb.client.policy; import static java.lang.String.format; -import static org.eclipse.edc.jsonld.spi.Namespaces.DCAT_PREFIX; -import static org.eclipse.edc.jsonld.spi.Namespaces.DCAT_SCHEMA; -import static org.eclipse.edc.jsonld.spi.Namespaces.DCT_PREFIX; -import static org.eclipse.edc.jsonld.spi.Namespaces.DCT_SCHEMA; -import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX; -import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA; -import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_ACCESS_SERVICE_ATTRIBUTE; -import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCT_FORMAT_ATTRIBUTE; -import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX; -import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA; import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; import static org.eclipse.edc.spi.query.Criterion.criterion; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.net.URL; import java.util.Collection; import java.util.List; @@ -43,38 +32,37 @@ import java.util.stream.Stream; import org.eclipse.edc.catalog.spi.Catalog; -import org.eclipse.edc.catalog.spi.DataService; import org.eclipse.edc.catalog.spi.Dataset; -import org.eclipse.edc.catalog.spi.Distribution; import org.eclipse.edc.connector.spi.catalog.CatalogService; +import org.eclipse.edc.jsonld.TitaniumJsonLd; +import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.policy.model.Rule; import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.asset.Asset; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; - import de.fraunhofer.iosb.client.exception.AmbiguousOrNullException; import de.fraunhofer.iosb.client.util.Pair; import jakarta.json.Json; -import jakarta.json.JsonObject; /** * Finds out policy for a given asset id and provider EDC url */ class PolicyService { + private static final String CATALOG_RETRIEVAL_FAILURE_MSG = "Catalog by provider %s couldn't be retrieved: %s"; private final CatalogService catalogService; private final TypeTransformerRegistry transformer; - + private final PolicyServiceConfig config; private final PolicyDefinitionStore policyDefinitionStore; + private final JsonLd jsonLdExpander; + /** * Class constructor * @@ -82,12 +70,15 @@ class PolicyService { * @param transformer Transform json-ld byte-array catalog to catalog class */ public PolicyService(CatalogService catalogService, TypeTransformerRegistry transformer, - PolicyServiceConfig config, PolicyDefinitionStore policyDefinitionStore) { + PolicyServiceConfig config, PolicyDefinitionStore policyDefinitionStore, Monitor monitor) { this.catalogService = catalogService; this.transformer = transformer; this.config = config; this.policyDefinitionStore = policyDefinitionStore; + + this.jsonLdExpander = new TitaniumJsonLd(monitor); + } Dataset getDatasetForAssetId(URL providerUrl, String assetId) throws InterruptedException { @@ -110,33 +101,35 @@ Dataset getDatasetForAssetId(URL providerUrl, String assetId) throws Interrupted } if (catalogResponse.failed()) { - throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl, + throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl, catalogResponse.getFailureMessages())); } - JsonObject modifiedCatalogJson; - try { - modifiedCatalogJson = modifyCatalogJson(catalogResponse.getContent()); - } catch (IOException except) { - throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl, except)); - } + var catalogJson = Json.createReader(new ByteArrayInputStream(catalogResponse.getContent())) + .readObject(); - var catalog = transformer.transform(modifiedCatalogJson, Catalog.class); + var catalogJsonExpansionResult = jsonLdExpander.expand(catalogJson); - if (catalog.failed()) { - throw new EdcException(format("Catalog by provider %s couldn't be retrieved: %s", providerUrl, - catalog.getFailureMessages())); + if (catalogJsonExpansionResult.failed()) { + throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl, + catalogJsonExpansionResult.getFailureMessages())); } - if (Objects.isNull(catalog.getContent().getDatasets()) || catalog.getContent().getDatasets().size() != 1) { - throw new AmbiguousOrNullException(format("Multiple or no policyDefinitions were found for assetId %s!", - assetId)); + var catalogResult = transformer.transform(catalogJsonExpansionResult.getContent(), Catalog.class); + + if (catalogResult.failed()) { + throw new EdcException(format(CATALOG_RETRIEVAL_FAILURE_MSG, providerUrl, + catalogResult.getFailureMessages())); } - var dataset = catalog.getContent().getDatasets().get(0); - dataset.getDistributions().remove(0); // Remove distribution added to deserialize catalog + var datasets = catalogResult.getContent().getDatasets(); + if (Objects.isNull(datasets) || datasets.size() != 1) { + throw new AmbiguousOrNullException( + format("Multiple or no policyDefinitions were found for assetId %s!", + assetId)); + } - return dataset; + return datasets.get(0); } Pair getAcceptablePolicyForAssetId(URL providerUrl, String assetId) @@ -156,8 +149,7 @@ Pair getAcceptablePolicyForAssetId(URL providerUrl, String asset .orElseThrow(); } else { - throw new EdcException("Could not find any contract policyDefinition matching this connector's accepted " + - "policyDefinitions"); + throw new EdcException("Could not find any acceptable policyDefinition"); } return new Pair.PairBuilder() @@ -166,7 +158,8 @@ Pair getAcceptablePolicyForAssetId(URL providerUrl, String asset private boolean matchesOwnPolicyDefinitions(Policy policy) { return policyDefinitionStore.getPolicyDefinitions().stream().anyMatch( - acceptedPolicyDefinition -> policyDefinitionRulesEquality(acceptedPolicyDefinition.getPolicy(), + acceptedPolicyDefinition -> policyDefinitionRulesEquality( + acceptedPolicyDefinition.getPolicy(), policy)); } @@ -186,7 +179,8 @@ private boolean policyDefinitionRulesEquality(Policy first, Policy second) { .collect(Collectors.toList()); return firstRules.stream().anyMatch( - firstRule -> secondRules.stream().anyMatch(secondRule -> !ruleEquality(firstRule, secondRule))); + firstRule -> secondRules.stream() + .anyMatch(secondRule -> !ruleEquality(firstRule, secondRule))); } private boolean ruleEquality(T first, T second) { @@ -194,48 +188,4 @@ private boolean ruleEquality(T first, T second) { second.getConstraints()); } - /* - * Since EDC api does not return Catalog object directly, resort to another - * solution for now. - */ - private JsonObject modifyCatalogJson(byte[] catalogBytes) throws IOException { - - // Bytes to string. Replace "dcat:" etc. by schema. String to bytes again - catalogBytes = new String(catalogBytes) - .replace(DCAT_PREFIX + ":", DCAT_SCHEMA) - .replace(ODRL_PREFIX + ":", ODRL_SCHEMA) - .replace(DCT_PREFIX + ":", DCT_SCHEMA) - .replace(DSPACE_PREFIX + ":", DSPACE_SCHEMA) - .getBytes(); - - Distribution distribution = Distribution.Builder.newInstance() - .format("JSON") - .dataService(DataService.Builder.newInstance().build()) - .build(); - - JsonNode jsonNode; - var om = new ObjectMapper(); - - var distributionString = om.writeValueAsString(distribution); - var distributionNode = om.readTree(distributionString - .replace("format", DCT_FORMAT_ATTRIBUTE) - .replace("dataService", DCAT_ACCESS_SERVICE_ATTRIBUTE)); - - jsonNode = om.readTree(catalogBytes); - - if (!jsonNode.has(DCAT_SCHEMA + "dataset") || jsonNode.get(DCAT_SCHEMA + "dataset") == null - || jsonNode.get(DCAT_SCHEMA + "dataset").isEmpty()) { - throw new EdcException("No dataset provided in catalog."); - } - - ((ArrayNode) jsonNode - .get(DCAT_SCHEMA + "dataset") - .get(DCAT_SCHEMA + "distribution")) - .add(distributionNode); - catalogBytes = om.writeValueAsBytes(jsonNode); - - var jsonReader = Json.createReader(new ByteArrayInputStream(catalogBytes)); - return jsonReader.readObject(); - } - } diff --git a/client/src/test/java/de/fraunhofer/iosb/client/dataTransfer/TransferInitiatorTest.java b/client/src/test/java/de/fraunhofer/iosb/client/dataTransfer/TransferInitiatorTest.java index 488f1df6..75a65689 100644 --- a/client/src/test/java/de/fraunhofer/iosb/client/dataTransfer/TransferInitiatorTest.java +++ b/client/src/test/java/de/fraunhofer/iosb/client/dataTransfer/TransferInitiatorTest.java @@ -42,15 +42,14 @@ public class TransferInitiatorTest { private final TransferProcessManager mockTransferProcessManager = mock(TransferProcessManager.class); - private Config configMock; private TransferInitiator transferInitiator; private StatusResult mockStatusResult; @BeforeEach @SuppressWarnings("unchecked") - void initializeContractOfferService() throws URISyntaxException { - configMock = ConfigFactory.fromMap(Map.of("edc.dsp.callback.address", "http://localhost:4321/dsp", + void initializeContractOfferService() { + var configMock = ConfigFactory.fromMap(Map.of("edc.dsp.callback.address", "http://localhost:4321/dsp", "web.http.port", "8080", "web.http.path", "/api")); transferInitiator = new TransferInitiator(configMock, mock(Monitor.class), mockTransferProcessManager); diff --git a/client/src/test/java/de/fraunhofer/iosb/client/policy/PolicyServiceTest.java b/client/src/test/java/de/fraunhofer/iosb/client/policy/PolicyServiceTest.java index 8a4b4037..7c762ef8 100644 --- a/client/src/test/java/de/fraunhofer/iosb/client/policy/PolicyServiceTest.java +++ b/client/src/test/java/de/fraunhofer/iosb/client/policy/PolicyServiceTest.java @@ -35,6 +35,7 @@ import org.eclipse.edc.connector.spi.catalog.CatalogService; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.system.configuration.ConfigFactory; @@ -59,7 +60,7 @@ public PolicyServiceTest() throws MalformedURLException { @BeforeEach void initializeContractOfferService() { policyService = new PolicyService(mockCatalogService, mockTransformer, mockConfig(), - mock(PolicyDefinitionStore.class)); + mock(PolicyDefinitionStore.class), mock(Monitor.class)); } @Test diff --git a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/AasExtension.java b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/AasExtension.java index 49131819..33ae556b 100644 --- a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/AasExtension.java +++ b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/AasExtension.java @@ -62,16 +62,15 @@ public class AasExtension implements ServiceExtension { private static final Logger logger = Logger.getInstance(); private final ScheduledExecutorService syncExecutor = new ScheduledThreadPoolExecutor(1); private AasController aasController; - private ConfigurationController configurationController; @Override public void initialize(ServiceExtensionContext context) { - this.configurationController = new ConfigurationController(context.getConfig(SETTINGS_PREFIX)); + var configurationController = new ConfigurationController(context.getConfig(SETTINGS_PREFIX)); // Distribute controllers, repository var selfDescriptionRepository = new SelfDescriptionRepository(); this.aasController = new AasController(okHttpClient); - var endpoint = new Endpoint(selfDescriptionRepository, this.aasController, this.configurationController); + var endpoint = new Endpoint(selfDescriptionRepository, this.aasController, configurationController); // Initialize/Start synchronizer, start AAS services defined in configuration initializeSynchronizer(selfDescriptionRepository); diff --git a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/model/aas/CustomSubmodel.java b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/model/aas/CustomSubmodel.java index 4da277b8..89d7017d 100644 --- a/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/model/aas/CustomSubmodel.java +++ b/edc-extension4aas/src/main/java/de/fraunhofer/iosb/app/model/aas/CustomSubmodel.java @@ -83,11 +83,7 @@ public boolean equals(Object obj) { return false; } - if (!this.getIdShort().equals(other.getIdShort())) { - return false; - } - - return true; + return this.getIdShort().equals(other.getIdShort()); } } diff --git a/example/README.md b/example/README.md index 563e3837..77b93181 100644 --- a/example/README.md +++ b/example/README.md @@ -43,12 +43,12 @@ java "-Dedc.fs.config=./example/configurations/provider.properties" -jar ./examp After building the extension as seen above, a docker image can be built with -0. `cd ./example` -1. `docker build -t edc-aas-extension:latest .` -2. `mkdir workdir` -2. Create configuration under `./workdir/config.properties` -3. Add additional files under `./workdir` (Fitting to your paths in config.properties) -4. Run with `docker run -i -v $PWD/workdir:/workdir/ -e EDC_FS_CONFIG=/workdir/config.properties edc-extension4aas:latest` +1. `cd ./example` +2. `docker build -t edc-aas-extension:latest .` +3. `mkdir workdir` +4. Create configuration under `./workdir/config.properties` +5. Add additional files under `./workdir` (Fitting to your paths in config.properties) +6. Run with `docker run -i -v $PWD/workdir:/workdir/ -e EDC_FS_CONFIG=/workdir/config.properties edc-extension4aas:latest` This docker image can be run individually or **inside a docker-compose file**: