Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes no data exchange possible #95

Merged
merged 5 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ java {
}

dependencies {

// Centralized auth request filter
implementation(project(":public-api-management"))

// See this project's README.MD for explanations
implementation("$group:contract-core:$edcVersion")
implementation("$group:dsp-catalog-http-dispatcher:$edcVersion")
implementation("$group:management-api:$edcVersion")
implementation("$group:runtime-metamodel:$edcVersion")
implementation("$group:data-plane-http-spi:$edcVersion") // HttpDataAddress

implementation("jakarta.ws.rs:jakarta.ws.rs-api:${rsApi}")

testImplementation("$group:junit:$edcVersion")
Expand Down
45 changes: 21 additions & 24 deletions client/src/main/java/de/fraunhofer/iosb/client/ClientEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
/**
* Automated contract negotiation
*/
@Consumes({ MediaType.APPLICATION_JSON, MediaType.WILDCARD })
@Produces({ MediaType.APPLICATION_JSON })
@Consumes({MediaType.APPLICATION_JSON, MediaType.WILDCARD})
@Produces({MediaType.APPLICATION_JSON})
@Path(ClientEndpoint.AUTOMATED_PATH)
public class ClientEndpoint {
/*
Expand All @@ -70,15 +70,15 @@ public class ClientEndpoint {
/**
* Initialize a client endpoint.
*
* @param policyService Finds out policy for a given asset id and provider
* EDC url.
* @param negotiator Send contract offer, negotiation status watch.
* @param transferInitiator Initiate transfer requests.
* @param monitor Logging functionality
* @param negotiationController Send contract offer, negotiation status watch.
* @param policyController Provides API for accepted policy management and provider dataset retrieval.
* @param transferController Initiate transfer requests.
*/
public ClientEndpoint(Monitor monitor,
NegotiationController negotiationController,
PolicyController policyController,
DataTransferController transferController) {
NegotiationController negotiationController,
PolicyController policyController,
DataTransferController transferController) {
this.monitor = monitor;

this.policyController = policyController;
Expand All @@ -91,13 +91,10 @@ public ClientEndpoint(Monitor monitor,
* of the services' policyDefinitionStore instance containing user added
* policyDefinitions. If more than one policyDefinitions are provided by the
* provider connector, an AmbiguousOrNullException will be thrown.
*
*
* @param providerUrl Provider of the asset.
* @param assetId Asset ID of the asset whose contract should be fetched.
* @return One policyDefinition offered by the provider for the given assetId.
* @throws InterruptedException Thread for agreementId was waiting, sleeping, or
* otherwise occupied, and was
* interrupted.
*/
@GET
@Path(DATASET_PATH)
Expand All @@ -123,18 +120,18 @@ public Response getDataset(@QueryParam("providerUrl") URL providerUrl, @QueryPar
* Negotiate a contract agreement using the given contract offer if no agreement
* exists for this constellation.
*
* @param providerUrl Provider EDCs URL (DSP endpoint)
* @param providerId Provider EDCs ID
* @param assetId ID of the asset to be retrieved
* @param providerUrl Provider EDCs URL (DSP endpoint)
* @param providerId Provider EDCs ID
* @param assetId ID of the asset to be retrieved
* @param dataDestinationUrl URL of destination data sink.
* @return Asset data
*/
@POST
@Path(NEGOTIATE_PATH)
public Response negotiateContract(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("providerId") String providerId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
@QueryParam("providerId") String providerId,
@QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
monitor.debug(format("[Client] Received a %s POST request", NEGOTIATE_PATH));
Objects.requireNonNull(providerUrl, "Provider URL must not be null");
Objects.requireNonNull(providerId, "Provider ID must not be null");
Expand Down Expand Up @@ -205,17 +202,17 @@ public Response negotiateContract(ContractRequest contractRequest) {
/**
* Submits a data transfer request to the providerUrl.
*
* @param providerUrl The data provider's url
* @param agreementId The basis of the data transfer.
* @param assetId The asset of which the data should be transferred
* @param providerUrl The data provider's url
* @param agreementId The basis of the data transfer.
* @param assetId The asset of which the data should be transferred
* @param dataDestinationUrl URL of destination data sink.
* @return On success, the data of the desired asset. Else, returns an error message.
*/
@GET
@Path(TRANSFER_PATH)
public Response getData(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("agreementId") String agreementId, @QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
@QueryParam("agreementId") String agreementId, @QueryParam("assetId") String assetId,
@QueryParam("dataDestinationUrl") URL dataDestinationUrl) {
monitor.debug(format("[Client] Received a %s GET request", TRANSFER_PATH));
Objects.requireNonNull(providerUrl, "providerUrl must not be null");
Objects.requireNonNull(agreementId, "agreementId must not be null");
Expand Down
66 changes: 33 additions & 33 deletions client/src/main/java/de/fraunhofer/iosb/client/ClientExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package de.fraunhofer.iosb.client;

import org.eclipse.edc.api.auth.spi.AuthenticationService;
import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;
import org.eclipse.edc.connector.contract.spi.negotiation.ConsumerContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationObservable;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
Expand All @@ -27,45 +30,42 @@
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.WebService;

import de.fraunhofer.iosb.client.dataTransfer.DataTransferController;
import de.fraunhofer.iosb.client.negotiation.NegotiationController;
import de.fraunhofer.iosb.client.policy.PolicyController;

public class ClientExtension implements ServiceExtension {

@Inject
private AuthenticationService authenticationService;
@Inject
private CatalogService catalogService;
@Inject
private ConsumerContractNegotiationManager consumerNegotiationManager;
@Inject
private ContractNegotiationObservable contractNegotiationObservable;
@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
private TransferProcessManager transferProcessManager;
@Inject
private TypeTransformerRegistry transformer;
@Inject
private WebService webService;
// Non-public unified authentication request filter management service
@Inject
private PublicApiManagementService publicApiManagementService;

@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var config = context.getConfig("edc.client");
@Inject
private CatalogService catalogService;
@Inject
private ConsumerContractNegotiationManager consumerNegotiationManager;
@Inject
private ContractNegotiationObservable contractNegotiationObservable;
@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
private TransferProcessManager transferProcessManager;
@Inject
private TypeTransformerRegistry transformer;
@Inject
private WebService webService;

var policyController = new PolicyController(monitor, catalogService, transformer, config);
@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var config = context.getConfig("edc.client");

var negotiationController = new NegotiationController(consumerNegotiationManager,
contractNegotiationObservable, contractNegotiationStore, config);
var policyController = new PolicyController(monitor, catalogService, transformer, config);

var dataTransferController = new DataTransferController(monitor, config, webService,
authenticationService, transferProcessManager);
var negotiationController = new NegotiationController(consumerNegotiationManager,
contractNegotiationObservable, contractNegotiationStore, config);

webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));
var dataTransferController = new DataTransferController(monitor, context.getConfig(), webService,
publicApiManagementService, transferProcessManager, context.getConnectorId());

}
webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
*/
package de.fraunhofer.iosb.client.authentication;

import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.api.model.Endpoint;
import de.fraunhofer.iosb.api.model.HttpMethod;
import de.fraunhofer.iosb.client.ClientEndpoint;
import de.fraunhofer.iosb.client.dataTransfer.DataTransferEndpoint;
import jakarta.ws.rs.container.ContainerRequestContext;
import org.eclipse.edc.api.auth.spi.AuthenticationRequestFilter;
import org.eclipse.edc.api.auth.spi.AuthenticationService;
import org.eclipse.edc.spi.monitor.Monitor;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -32,49 +36,24 @@
* Custom AuthenticationRequestFilter filtering requests that go directly to an
* AAS service (managed by this extension) or the extension's configuration.
*/
public class CustomAuthenticationRequestFilter extends AuthenticationRequestFilter {
public class DataTransferEndpointManager {

private final Monitor monitor;
private final Map<String, String> tempKeys;
private final PublicApiManagementService publicApiManagementService;

public CustomAuthenticationRequestFilter(Monitor monitor, AuthenticationService authenticationService) {
super(authenticationService);
this.monitor = monitor;
tempKeys = new ConcurrentHashMap<>();
public DataTransferEndpointManager(PublicApiManagementService publicApiManagementService) {
this.publicApiManagementService = publicApiManagementService;
}

/**
* Add key,value pair for a request. This key will only be available for one
* request.
*
* @param key The key name
* @param value The actual key
*/
public void addTemporaryApiKey(String key, String value) {
tempKeys.put(key, value);
}

/**
* On automated data transfer: If the request is valid, the key,value pair used
* for this request will no longer be valid.
* @param agreementId Agreement to build the endpoint path suffix
* @param key The key name
* @param value The value
*/
@Override
public void filter(ContainerRequestContext requestContext) {
Objects.requireNonNull(requestContext);
var requestPath = requestContext.getUriInfo().getPath();

for (String key : tempKeys.keySet()) {
if (requestContext.getHeaders().containsKey(key)
&& requestContext.getHeaderString(key).equals(tempKeys.get(key))
&& requestPath.startsWith(
format("%s/%s", ClientEndpoint.AUTOMATED_PATH, DataTransferEndpoint.RECEIVE_DATA_PATH))) {
monitor.debug(
format("[Client] Data Transfer request with custom api key %s", key));
tempKeys.remove(key);
return;
}
}

super.filter(requestContext);
public void addTemporaryEndpoint(String agreementId, String key, String value) {
var endpointSuffix = ClientEndpoint.AUTOMATED_PATH + "/receiveData/" + agreementId;
publicApiManagementService.addTemporaryEndpoint(new Endpoint(endpointSuffix, HttpMethod.POST, Map.of(key, List.of(value))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.eclipse.edc.api.auth.spi.AuthenticationService;
import de.fraunhofer.iosb.api.PublicApiManagementService;
import de.fraunhofer.iosb.client.authentication.DataTransferEndpointManager;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.web.spi.WebService;

import de.fraunhofer.iosb.client.authentication.CustomAuthenticationRequestFilter;

public class DataTransferController {

static final String DATA_TRANSFER_API_KEY = "data-transfer-api-key";
Expand All @@ -46,27 +45,26 @@ public class DataTransferController {
private final DataTransferObservable dataTransferObservable;
private final TransferInitiator transferInitiator;

private final CustomAuthenticationRequestFilter dataEndpointAuthenticationRequestFilter;
private final DataTransferEndpointManager dataTransferEndpointManager;

/**
* Class constructor
*
* @param monitor Logging.
* @param config Read config value transfer timeout and
* own URI
* @param webService Register data transfer endpoint.
* @param authenticationService Creating and passing through custom api
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
* @param monitor Logging.
* @param config Read config value transfer timeout and
* own URI
* @param webService Register data transfer endpoint.
* @param publicApiManagementService Creating and passing through custom api
* keys for each data transfer.
* @param transferProcessManager Initiating a transfer process as a
* consumer.
* @param connectorId Connector ID for the provider to learn
*/
public DataTransferController(Monitor monitor, Config config, WebService webService,
AuthenticationService authenticationService, TransferProcessManager transferProcessManager) {
this.config = config;
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager);
this.dataEndpointAuthenticationRequestFilter = new CustomAuthenticationRequestFilter(monitor,
authenticationService);

PublicApiManagementService publicApiManagementService, TransferProcessManager transferProcessManager, String connectorId) {
this.config = config.getConfig("edc.client");
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager, connectorId);
this.dataTransferEndpointManager = new DataTransferEndpointManager(publicApiManagementService);
this.dataTransferObservable = new DataTransferObservable(monitor);
var dataTransferEndpoint = new DataTransferEndpoint(monitor, dataTransferObservable);
webService.registerResource(dataTransferEndpoint);
Expand All @@ -76,26 +74,24 @@ public DataTransferController(Monitor monitor, Config config, WebService webServ
* Initiates the transfer process defined by the arguments. The data of the
* transfer will be sent to {@link DataTransferEndpoint#RECEIVE_DATA_PATH}.
*
* @param providerUrl The provider from whom the data is to be fetched.
* @param agreementId Non-null ContractAgreement of the negotiation process.
* @param assetId The asset to be fetched.
* @param dataSinkAddress HTTPDataAddress the result of the transfer should be
* sent to. (If null, send to extension and print in log)
*
* @return A completable future whose result will be the data or an error
* message.
* @param providerUrl The provider from whom the data is to be fetched.
* @param agreementId Non-null ContractAgreement of the negotiation process.
* @param assetId The asset to be fetched.
* @param dataDestinationUrl HTTPDataAddress the result of the transfer should be
* sent to. (If null, send to extension and print in log)
* @return A completable future whose result will be the data or an error message.
* @throws InterruptedException If the data transfer was interrupted
* @throws ExecutionException If the data transfer process failed
*/
public String initiateTransferProcess(URL providerUrl, String agreementId, String assetId,
URL dataDestinationUrl) throws InterruptedException, ExecutionException {
URL dataDestinationUrl) throws InterruptedException, ExecutionException {
// Prepare for incoming data
var dataFuture = new CompletableFuture<String>();
dataTransferObservable.register(dataFuture, agreementId);

if (Objects.isNull(dataDestinationUrl)) {
var apiKey = UUID.randomUUID().toString();
dataEndpointAuthenticationRequestFilter.addTemporaryApiKey(DATA_TRANSFER_API_KEY, apiKey);
dataTransferEndpointManager.addTemporaryEndpoint(agreementId, DATA_TRANSFER_API_KEY, apiKey);

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, apiKey);
return waitForData(dataFuture, agreementId);
Expand All @@ -112,7 +108,7 @@ public String initiateTransferProcess(URL providerUrl, String agreementId, Strin

private String waitForData(CompletableFuture<String> dataFuture, String agreementId)
throws InterruptedException, ExecutionException {
var waitForTransferTimeout = config.getInteger("getWaitForTransferTimeout",
var waitForTransferTimeout = config.getInteger("waitForTransferTimeout",
WAIT_FOR_TRANSFER_TIMEOUT_DEFAULT);
try {
// Fetch TransferTimeout everytime to adapt to runtime config changes
Expand Down
Loading