Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update EDC version to v0.7.1 #120

Merged
merged 7 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public Response negotiateContract(@QueryParam("providerUrl") URL counterPartyUrl
.build();
}

return getData(counterPartyUrl, agreement.getId(), assetId, dataAddress);
return getData(counterPartyUrl, agreement.getId(), dataAddress);
}

/**
Expand Down Expand Up @@ -246,24 +246,20 @@ public Response negotiateContract(ContractRequest contractRequest) {
*
* @param providerUrl The data provider's url
* @param agreementId The basis of the data transfer.
* @param assetId The asset of which the data should be transferred
* @param dataAddress URL of destination data sink.
* @return On success, the data of the desired asset. Else, returns an error message.
*/
@POST
@Path(TRANSFER_PATH)
public Response getData(@QueryParam("providerUrl") URL providerUrl,
@QueryParam("agreementId") String agreementId,
@QueryParam("assetId") String assetId,
DataAddress dataAddress) {
monitor.debug(format("[Client] Received a %s GET request", TRANSFER_PATH));
Objects.requireNonNull(providerUrl, "providerUrl must not be null");
Objects.requireNonNull(agreementId, "agreementId must not be null");
Objects.requireNonNull(assetId, "assetId must not be null");

try {
var data = transferController.initiateTransferProcess(providerUrl, agreementId, assetId,
dataAddress);
var data = transferController.initiateTransferProcess(providerUrl, agreementId, dataAddress);
if (Objects.isNull(dataAddress)) {
return Response.ok(data).build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.edc.connector.controlplane.services.spi.catalog.CatalogService;
import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.Hostname;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
Expand All @@ -45,6 +46,8 @@ public class ClientExtension implements ServiceExtension {
@Inject
private ContractNegotiationStore contractNegotiationStore;
@Inject
private Hostname hostname;
@Inject
private TransferProcessManager transferProcessManager;
@Inject
private TypeTransformerRegistry transformer;
Expand All @@ -63,7 +66,7 @@ public void initialize(ServiceExtensionContext context) {

// This controller needs base config to read EDC's hostname + specific ports
var dataTransferController = new DataTransferController(monitor, context.getConfig(), webService,
publicApiManagementService, transferProcessManager);
publicApiManagementService, transferProcessManager, hostname);

webService.registerResource(new ClientEndpoint(monitor, negotiationController, policyController,
dataTransferController));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.Hostname;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.web.spi.WebService;
Expand Down Expand Up @@ -60,8 +61,8 @@ public class DataTransferController {
* consumer.
*/
public DataTransferController(Monitor monitor, Config config, WebService webService,
PublicApiManagementService publicApiManagementService, TransferProcessManager transferProcessManager) {
this.transferInitiator = new TransferInitiator(config, monitor, transferProcessManager);
PublicApiManagementService publicApiManagementService, TransferProcessManager transferProcessManager, Hostname hostname) {
this.transferInitiator = new TransferInitiator(monitor, config, hostname, transferProcessManager);
this.config = config.getConfig("edc.client");
this.dataTransferEndpointManager = new DataTransferEndpointManager(publicApiManagementService);
this.dataTransferObservable = new DataTransferObservable(monitor);
Expand All @@ -75,27 +76,26 @@ public DataTransferController(Monitor monitor, Config config, WebService webServ
*
* @param providerUrl The provider from whom the data is to be fetched.
* @param agreementId Non-null ContractAgreement of the negotiation process.
* @param assetId The asset to be fetched.
* @param dataSinkAddress HTTPDataAddress the result of the transfer should be
* sent to. (If null, send to extension and print in log)
* @return A completable future whose result will be the data or an error message.
* @throws InterruptedException If the data transfer was interrupted
* @throws ExecutionException If the data transfer process failed
*/
public String initiateTransferProcess(URL providerUrl, String agreementId, String assetId,
DataAddress dataSinkAddress) throws InterruptedException, ExecutionException {
public String initiateTransferProcess(URL providerUrl, String agreementId, DataAddress dataSinkAddress)
throws InterruptedException, ExecutionException {
// Prepare for incoming data
var dataFuture = dataTransferObservable.register(agreementId);

if (Objects.isNull(dataSinkAddress)) {
var apiKey = UUID.randomUUID().toString();
dataTransferEndpointManager.addTemporaryEndpoint(agreementId, DATA_TRANSFER_API_KEY, apiKey);

this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, apiKey);
this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, apiKey);
return waitForData(dataFuture, agreementId);
} else {
// Send data to custom target url
this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, assetId, dataSinkAddress);
this.transferInitiator.initiateTransferProcess(providerUrl, agreementId, dataSinkAddress);
// Don't have to wait for data
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,49 @@
package de.fraunhofer.iosb.client.datatransfer;

import de.fraunhofer.iosb.client.ClientEndpoint;
import jakarta.ws.rs.core.UriBuilder;
import jakarta.ws.rs.core.UriBuilderException;
import org.eclipse.edc.connector.controlplane.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferRequest;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.Hostname;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.types.domain.DataAddress;

import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Objects;
import java.util.UUID;

import static de.fraunhofer.iosb.client.datatransfer.DataTransferController.DATA_TRANSFER_API_KEY;
import static java.lang.String.format;
import static java.lang.String.join;
import static org.eclipse.edc.protocol.dsp.http.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;
import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH;

/**
* Initiate transfer requests
*/
class TransferInitiator {

private final TransferProcessManager transferProcessManager;
public static final String COULD_NOT_BUILD_URI_MESSAGE = "[Client] Could not build own URI, thus cannot transfer data to this EDC. Only data transfers to external endpoints are supported.";
public static final String HTTPS_KEYSTORE_PATH = "edc.web.https.keystore.path";
public static final String HTTP_PORT = "web.http.port";
public static final String HTTP_PATH = "web.http.path";

private final Monitor monitor;
private final TransferProcessManager transferProcessManager;
private final URI ownUri;

TransferInitiator(Config config, Monitor monitor,
TransferInitiator(Monitor monitor, Config config, Hostname hostname,
TransferProcessManager transferProcessManager) {
this.monitor = monitor;
this.ownUri = createOwnUriFromConfigurationValues(config);
this.transferProcessManager = transferProcessManager;
this.ownUri = createOwnUriFromConfigurationValues(config, hostname);
}

void initiateTransferProcess(URL providerUrl, String agreementId, String assetId, String apiKey) {
void initiateTransferProcess(URL providerUrl, String agreementId, String apiKey) {
if (Objects.isNull(ownUri)) {
monitor.warning(
"Cannot transfer to own EDC since own URI could not be built while initializing client extension. Not continuing...");
monitor.severe(COULD_NOT_BUILD_URI_MESSAGE);
return;
}
var dataDestination = HttpDataAddress.Builder.newInstance()
Expand All @@ -63,60 +67,40 @@ void initiateTransferProcess(URL providerUrl, String agreementId, String assetId
.addAdditionalHeader(DATA_TRANSFER_API_KEY, apiKey) // API key for validation on consumer side
.build();

initiateTransferProcess(providerUrl, agreementId, assetId, dataDestination);
initiateTransferProcess(providerUrl, agreementId, dataDestination);
}

void initiateTransferProcess(URL providerUrl, String agreementId, String assetId, DataAddress dataSinkAddress) {

void initiateTransferProcess(URL providerUrl, String agreementId, DataAddress dataSinkAddress) {
var transferRequest = TransferRequest.Builder.newInstance()
.id(UUID.randomUUID().toString()) // this is not relevant, thus can be random
.counterPartyAddress(providerUrl.toString()) // the address of the provider connector
.protocol(DATASPACE_PROTOCOL_HTTP)
.assetId(assetId)
.dataDestination(dataSinkAddress)
.counterPartyAddress(providerUrl.toString())
.contractId(agreementId)
.transferType(join("-", dataSinkAddress.getType(), PUSH.name()))
.dataDestination(dataSinkAddress)
.build();

var transferProcessStatus = transferProcessManager.initiateConsumerRequest(transferRequest);
if (transferProcessStatus.failed()) {
throw new EdcException(transferProcessStatus.getFailureDetail());
}
transferProcessManager
.initiateConsumerRequest(transferRequest)
.onFailure(failure -> monitor.severe(failure.getFailureDetail()));
}

private URI createOwnUriFromConfigurationValues(Config config) {
String protocolAddressString;
int ownPort;
String ownPath;
try {
protocolAddressString = config.getString("edc.dsp.callback.address");
ownPort = config.getInteger("web.http.port", -1);
ownPath = config.getString("web.http.path", null);
} catch (EdcException noSettingFound) {
monitor.severe(
format("[Client] Could not build own URI, thus cannot transfer data to this EDC. Only data transfers to external endpoints are supported. Exception message: %s",
noSettingFound.getMessage()));
return null;
}

// Remove /dsp from URL
protocolAddressString = protocolAddressString.substring(0, protocolAddressString.length() - "/dsp".length());
private URI createOwnUriFromConfigurationValues(Config config, Hostname hostname) {
try {
return UriBuilder
.fromUri(protocolAddressString)
.port(ownPort)
.path(format(
"%s/%s/%s",
ownPath,
ClientEndpoint.AUTOMATED_PATH,
DataTransferEndpoint.RECEIVE_DATA_PATH))
.build();
// HTTPS requires this value. With this configuration variable set, the connector will run with HTTPS enabled
var uriString = "%s://%s:%s%s/%s/%s".formatted(
config.getString(HTTPS_KEYSTORE_PATH, null) == null ? Protocol.HTTP.name() : Protocol.HTTPS.name(),
hostname.get(),
config.getInteger(HTTP_PORT),
config.getString(HTTP_PATH),
ClientEndpoint.AUTOMATED_PATH,
DataTransferEndpoint.RECEIVE_DATA_PATH);

} catch (IllegalArgumentException | UriBuilderException ownUriBuilderException) {
monitor.severe(
format("[Client] Could not build own URI, thus cannot transfer data to this EDC. Only data transfers to external endpoints are supported. Exception message: %s",
ownUriBuilderException.getMessage()));
return new URI(uriString);
} catch (URISyntaxException | EdcException couldNotBuildException) {
monitor.warning(COULD_NOT_BUILD_URI_MESSAGE, couldNotBuildException);
return null;
}
return null;
}

private enum Protocol { HTTP, HTTPS }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Optional;

import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_ACCESS_SERVICE_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATA_SERVICE_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCT_FORMAT_ATTRIBUTE;

Expand All @@ -42,7 +45,12 @@ public JsonObjectToDistributionTransformer() {
var builder = Distribution.Builder.newInstance();

transformMandatoryString(jsonObject.get(DCT_FORMAT_ATTRIBUTE), builder::format, context);
transformArrayOrObject(jsonObject.get(DCAT_DATA_SERVICE_ATTRIBUTE), DataService.class, builder::dataService, context);
transformArrayOrObject(
Optional.of(jsonObject.get(DCAT_ACCESS_SERVICE_ATTRIBUTE))
.orElse(jsonObject.get(DCAT_DATA_SERVICE_ATTRIBUTE)),
DataService.class,
builder::dataService,
context);

return builderResult(builder::build, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockserver.integration.ClientAndServer;

Expand Down Expand Up @@ -114,7 +115,8 @@ public void setup() throws IOException {
mockConfig(),
mock(WebService.class),
mock(PublicApiManagementService.class),
mockTransferProcessManager()));
mockTransferProcessManager(),
() -> "localhost"));
}

private Config mockConfig() {
Expand Down Expand Up @@ -198,10 +200,11 @@ public void negotiateContractAndTransferTest() {
}
}

@Disabled("Does not throw an exception anymore")
@Test
public void getDataTest() {
try {
clientEndpoint.getData(url, "test-agreement-id", "test-asset-id", AasDataAddress.Builder.newInstance().baseUrl(url.toString()).build());
clientEndpoint.getData(url, "test-agreement-id", AasDataAddress.Builder.newInstance().baseUrl(url.toString()).build());
fail();
} catch (EdcException expected) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.net.MalformedURLException;
Expand All @@ -47,10 +48,9 @@ public class TransferInitiatorTest {
@BeforeEach
@SuppressWarnings("unchecked")
void initializeContractOfferService() {
var configMock = ConfigFactory.fromMap(Map.of("edc.dsp.callback.address", "http://localhost:4321/dsp",
"web.http.port", "8080", "web.http.path", "/api"));
var configMock = ConfigFactory.fromMap(Map.of("web.http.port", "8080", "web.http.path", "/api"));

transferInitiator = new TransferInitiator(configMock, mock(Monitor.class), mockTransferProcessManager);
transferInitiator = new TransferInitiator(mock(Monitor.class), configMock, () -> "localhost", mockTransferProcessManager);

mockStatusResult = (StatusResult<TransferProcess>) mock(StatusResult.class);

Expand All @@ -62,25 +62,26 @@ void testInitiateTransferProcess() throws MalformedURLException {
when(mockStatusResult.failed()).thenReturn(false);

transferInitiator.initiateTransferProcess(new URL("http://provider-url:1234"), "test-agreement-id",
"test-asset", UUID.randomUUID().toString());
UUID.randomUUID().toString());
verify(mockTransferProcessManager, times(1)).initiateConsumerRequest(any());
}

@Test
void testInitiateTransferProcessCustomDataAddress() throws MalformedURLException {
when(mockStatusResult.failed()).thenReturn(false);
var dataSink = HttpDataAddress.Builder.newInstance().baseUrl("http://example.com").build();
transferInitiator.initiateTransferProcess(new URL("http://provider-url:1234"), "test-agreement-id",
"test-asset", dataSink);
transferInitiator.initiateTransferProcess(new URL("http://provider-url:1234"),
"test-agreement-id", dataSink);
verify(mockTransferProcessManager, times(1)).initiateConsumerRequest(any());
}

@Disabled("Does not throw an exception anymore")
@Test
void testInitiateTransferProcessThrowsEdcExceptionOnFailedTransferInitiation() throws MalformedURLException {
when(mockStatusResult.failed()).thenReturn(true);
try {
transferInitiator.initiateTransferProcess(new URL("http://provider-url:1234"), "test-agreement-id",
"test-asset", UUID.randomUUID().toString());
UUID.randomUUID().toString());
fail();
} catch (EdcException expected) {
}
Expand Down
Loading
Loading