Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract client extension #65

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 83 additions & 62 deletions README.md

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions client/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
plugins {
`java-library`
jacoco
}

val javaVersion: String by project
val edcVersion: String by project
val rsApi: String by project
val mockitoVersion: String by project
val mockserverVersion: String by project

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(javaVersion))
}
}

dependencies {
// See this project's README.MD for explanations
implementation("$group:contract-core:$edcVersion")
implementation("$group:dsp-catalog-http-dispatcher:$edcVersion")
implementation("$group:management-api:$edcVersion")
implementation("$group:runtime-metamodel:$edcVersion")
implementation("$group:data-plane-http-spi:$edcVersion") // HttpDataAddress

implementation("jakarta.ws.rs:jakarta.ws.rs-api:${rsApi}")

testImplementation("$group:junit:$edcVersion")
testImplementation("org.glassfish.jersey.core:jersey-common:3.1.3")
testImplementation("org.mockito:mockito-core:${mockitoVersion}")
testImplementation("org.mock-server:mockserver-junit-jupiter:${mockserverVersion}")
testImplementation("org.mock-server:mockserver-netty:${mockserverVersion}")
}

repositories {
mavenCentral()
}

tasks.test {
useJUnitPlatform()
}

tasks.jacocoTestReport {
dependsOn(tasks.test) // tests are required to run before generating the report
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2021 Fraunhofer IOSB, eine rechtlich nicht selbstaendige
* Einrichtung der Fraunhofer-Gesellschaft zur Foerderung der angewandten
* Forschung e.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.client;

import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.connector.contract.spi.negotiation.ConsumerContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationObservable;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.spi.catalog.CatalogService;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.WebService;

import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;

public class ClientExtension implements ServiceExtension {

@Inject
private AuthenticationService authenticationService;
@Inject
private CatalogService catalogService;
@Inject
private ConsumerContractNegotiationManager consumerNegotiationManager;
@Inject
private ContractNegotiationObservable contractNegotiationObservable;
@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
private TransferProcessManager transferProcessManager;
@Inject
private TypeTransformerRegistry transformer;
@Inject
private WebService webService;

@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var config = context.getConfig();

var policyController = new PolicyController(monitor, catalogService, transformer, config);

var negotiationController = new NegotiationController(consumerNegotiationManager,
contractNegotiationObservable, contractNegotiationStore, config);

var dataTransferController = new DataTransferController(monitor, config, webService,
authenticationService, transferProcessManager);

webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2021 Fraunhofer IOSB, eine rechtlich nicht selbstaendige
* Einrichtung der Fraunhofer-Gesellschaft zur Foerderung der angewandten
* Forschung e.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.client.authentication;

import de.fraunhofer.iosb.client.ClientEndpoint;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferEndpoint;
import jakarta.ws.rs.container.ContainerRequestContext;
import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.spi.monitor.Monitor;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static java.lang.String.format;

/**
* Custom AuthenticationRequestFilter filtering requests that go directly to an
* AAS service (managed by this extension) or the extension's configuration.
*/
public class CustomAuthenticationRequestFilter extends AuthenticationRequestFilter {

private final Monitor monitor;
private final Map<String, String> tempKeys;

public CustomAuthenticationRequestFilter(Monitor monitor, AuthenticationService authenticationService) {
super(authenticationService);
this.monitor = monitor;
tempKeys = new ConcurrentHashMap<>();
}

/**
* Add key,value pair for a request. This key will only be available for one
* request.
*
* @param key The key name
* @param value The actual key
*/
public void addTemporaryApiKey(String key, String value) {
tempKeys.put(key, value);
}

/**
* On automated data transfer: If the request is valid, the key,value pair used
* for this request will no longer be valid.
*/
@Override
public void filter(ContainerRequestContext requestContext) {
Objects.requireNonNull(requestContext);
var requestPath = requestContext.getUriInfo().getPath();

for (String key : tempKeys.keySet()) {
if (requestContext.getHeaders().containsKey(key)
&& requestContext.getHeaderString(key).equals(tempKeys.get(key))
&& requestPath.startsWith(
format("%s/%s", ClientEndpoint.AUTOMATED_PATH, DataTransferEndpoint.RECEIVE_DATA_PATH))) {
monitor.debug(
format("[Client] Data Transfer request with custom api key %s", key));
tempKeys.remove(key);
return;
}
}

super.filter(requestContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (c) 2021 Fraunhofer IOSB, eine rechtlich nicht selbstaendige
* Einrichtung der Fraunhofer-Gesellschaft zur Foerderung der angewandten
* Forschung e.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.client.dataTransfer;

import static java.lang.String.format;

import java.net.URL;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.web.spi.WebService;

import de.fraunhofer.iosb.client.authentication.CustomAuthenticationRequestFilter;

public class DataTransferController {

static final String DATA_TRANSFER_API_KEY = "data-transfer-api-key";

private static final int WAIT_FOR_TRANSFER_TIMEOUT_DEFAULT = 10;

private final Config config;

private final DataTransferEndpoint dataTransferEndpoint;
private final DataTransferObservable dataTransferObservable;
private final TransferInitiator transferInitiator;

private final CustomAuthenticationRequestFilter dataEndpointAuthenticationRequestFilter;

/**
* Class constructor
*
* @param monitor Logging.
* @param config Read config value transfer timeout and
* own URI
* @param webService Register data transfer endpoint.
* @param dataEndpointAuthRequestFilter Creating and passing through custom api
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
*/
public DataTransferController(Monitor monitor, Config config, WebService webService,
AuthenticationService authenticationService, TransferProcessManager transferProcessManager) {
this.config = config;
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager);
this.dataEndpointAuthenticationRequestFilter = new CustomAuthenticationRequestFilter(monitor,
authenticationService);

this.dataTransferObservable = new DataTransferObservable(monitor);
this.dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
webService.registerResource(dataTransferEndpoint);
}

/**
* Initiates the transfer process defined by the arguments. The data of the
* transfer will be sent to {@link DataTransferEndpoint#RECEIVE_DATA_PATH}.
*
* @param providerUrl The provider from whom the data is to be fetched.
* @param agreementId Non-null ContractAgreement of the negotiation process.
* @param assetId The asset to be fetched.
* @param dataSinkAddress HTTPDataAddress the result of the transfer should be
* sent to. (If null, send to extension and print in log)
*
* @return A completable future whose result will be the data or an error
* message.
* @throws InterruptedException If the data transfer was interrupted
* @throws ExecutionException If the data transfer process failed
*/
public String initiateTransferProcess(URL providerUrl, String agreementId, String assetId,
URL dataDestinationUrl) throws InterruptedException, ExecutionException {
// Prepare for incoming data
var dataFuture = new CompletableFuture<String>();
dataTransferObservable.register(dataFuture, agreementId);

if (Objects.isNull(dataDestinationUrl)) {
var apiKey = UUID.randomUUID().toString();
dataEndpointAuthenticationRequestFilter.addTemporaryApiKey(DATA_TRANSFER_API_KEY, apiKey);

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, apiKey);
return waitForData(dataFuture, agreementId);
} else {
var dataSinkAddress = HttpDataAddress.Builder.newInstance()
.baseUrl(dataDestinationUrl.toString())
.build();

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, dataSinkAddress);
return null;
}

}

private String waitForData(CompletableFuture<String> dataFuture, String agreementId)
throws InterruptedException, ExecutionException {
var waitForTransferTimeout = config.getInteger("getWaitForTransferTimeout",
WAIT_FOR_TRANSFER_TIMEOUT_DEFAULT);
try {
// Fetch TransferTimeout everytime to adapt to runtime config changes
var data = dataFuture.get(waitForTransferTimeout, TimeUnit.SECONDS);
dataTransferObservable.unregister(agreementId);
return data;
} catch (TimeoutException transferTimeoutExceededException) {
dataTransferObservable.unregister(agreementId);
throw new EdcException(format("Waiting for an transfer failed for agreementId: %s", agreementId),
transferTimeoutExceededException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.app.client.dataTransfer;
package de.fraunhofer.iosb.client.dataTransfer;

import de.fraunhofer.iosb.app.Logger;
import de.fraunhofer.iosb.app.client.ClientEndpoint;
import de.fraunhofer.iosb.client.ClientEndpoint;
import jakarta.ws.rs.*;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import java.util.Objects;

import org.eclipse.edc.spi.monitor.Monitor;

import static java.lang.String.format;

/**
* Endpoint for automated data transfer
*/
@Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces({MediaType.APPLICATION_JSON})
@Consumes({ MediaType.APPLICATION_JSON, MediaType.WILDCARD })
@Produces({ MediaType.APPLICATION_JSON })
@Path(ClientEndpoint.AUTOMATED_PATH)
public class DataTransferEndpoint {

Expand All @@ -38,10 +39,11 @@ public class DataTransferEndpoint {
*/
public static final String RECEIVE_DATA_PATH = "receiveData";

private static final Logger LOGGER = Logger.getInstance();
private final Monitor monitor;
private final DataTransferObservable observable;

public DataTransferEndpoint(DataTransferObservable observable) {
public DataTransferEndpoint(Monitor monitor, DataTransferObservable observable) {
this.monitor = monitor;
this.observable = observable;
}

Expand All @@ -56,7 +58,7 @@ public DataTransferEndpoint(DataTransferObservable observable) {
@POST
@Path("receiveData/{agreement}")
public Response receiveData(@PathParam("agreement") String agreementId, String requestBody) {
LOGGER.log(format("Receiving data for agreement %s...", agreementId));
monitor.info(format("[Client] Receiving data for agreement %s...", agreementId));
Objects.requireNonNull(agreementId);
Objects.requireNonNull(requestBody);
observable.update(agreementId, requestBody);
Expand Down
Loading