Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger committed Nov 17, 2023
1 parent 3da3357 commit ce9d077
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private StatusResult<Void> checkExpiration(EndpointDataReferenceEntry entry) {
return StatusResult.success();
} else {
breakLease(entry);
return StatusResult.failure(ResponseStatus.FATAL_ERROR);
return StatusResult.failure(ResponseStatus.ERROR_RETRY, "Not yet expired.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.StreamingOutput;
import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
Expand Down Expand Up @@ -107,9 +105,6 @@ public void requestAsset(AssetRequest request, @Suspended AsyncResponse response
.properties(properties)
.build();

// transfer the data asynchronously
var sink = new AsyncStreamingDataSink(consumer -> response.resume((StreamingOutput) consumer::accept), executorService, monitor);

try {
dataPlaneManager.transfer(flowRequest).whenComplete((result, throwable) -> handleCompletion(response, result, throwable));
} catch (Exception e) {
Expand Down Expand Up @@ -173,6 +168,9 @@ private void handleCompletion(AsyncResponse response, StreamResult<Object> resul
} else if (throwable != null) {
reportError(response, throwable);
}
if (result.succeeded()) {
response.resume(result.getContent());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink;
import org.eclipse.edc.junit.annotations.ApiTest;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
Expand Down Expand Up @@ -78,6 +77,7 @@ void requestAsset_shouldReturnData_withAssetId() throws IOException {
.id(transferProcessId)
.authKey("authKey")
.authCode("authCode")
.contractId("contract-id")
.endpoint(url)
.build();

Expand All @@ -92,10 +92,7 @@ void requestAsset_shouldReturnData_withAssetId() throws IOException {

when(cache.referencesForAsset(assetId, null)).thenReturn(List.of(edr));
when(dataPlaneManager.transfer(any()))
.thenAnswer(a -> {
AsyncStreamingDataSink sink = a.getArgument(0);
return sink.transfer(datasource);
});
.thenAnswer(a -> CompletableFuture.completedFuture(StreamResult.success(response)));

var proxyResponseBytes = baseRequest()
.contentType(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -124,6 +121,7 @@ void requestAsset_shouldReturnError_WhenProxyCallFails(StreamResult<Object> resu
.authKey("authKey")
.authCode("authCode")
.endpoint(url)
.contractId("contract-id")
.build();

when(cache.referencesForAsset(assetId, null)).thenReturn(List.of(edr));
Expand Down Expand Up @@ -169,6 +167,7 @@ void requestAsset_shouldReturnError_whenMultipleEdrsByAssetIdFound() {
.id(UUID.randomUUID().toString())
.authKey("authKey")
.authCode("authCode")
.contractId("contract-id")
.endpoint(url)
.build();

Expand All @@ -194,6 +193,7 @@ void requestAsset_shouldReturnData_withTransferProcessId() throws IOException {
.id(transferProcessId)
.authKey("authKey")
.authCode("authCode")
.contractId("contract-id")
.endpoint(url)
.build();

Expand All @@ -208,10 +208,7 @@ void requestAsset_shouldReturnData_withTransferProcessId() throws IOException {

when(cache.resolveReference(transferProcessId)).thenReturn(edr);
when(dataPlaneManager.transfer(any()))
.thenAnswer(a -> {
AsyncStreamingDataSink sink = a.getArgument(0);
return sink.transfer(datasource);
});
.thenAnswer(a -> CompletableFuture.completedFuture(StreamResult.success(response)));

var proxyResponseBytes = baseRequest()
.contentType(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -256,6 +253,7 @@ void requestAsset_shouldReturnData_withDataPlaneUrl() throws IOException {
.id(transferProcessId)
.authKey("authKey")
.authCode("authCode")
.contractId("contract-id")
.endpoint(url)
.build();

Expand All @@ -271,8 +269,7 @@ void requestAsset_shouldReturnData_withDataPlaneUrl() throws IOException {
when(cache.resolveReference(transferProcessId)).thenReturn(edr);
when(dataPlaneManager.transfer(any()))
.thenAnswer(a -> {
AsyncStreamingDataSink sink = a.getArgument(0);
return sink.transfer(datasource);
return CompletableFuture.completedFuture(StreamResult.success(response));
});

var proxyResponseBytes = baseRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public record EndpointDataReferenceEntrySchema(
public static final String ENDPOINT_DATA_REFERENCE_ENTRY_EXAMPLE = """
{
"@type": "tx:EndpointDataReferenceEntry",
"edc:agreementId": "MQ==:MQ==:ZTY3MzQ4YWEtNTdmZC00YzA0LTg2ZmQtMGMxNzk0MWM3OTkw",
"edc:transferProcessId": "78a66945-d638-4c0a-be71-b35a0318a410",
"edc:assetId": "1",
"edc:providerId": "BPNL00DATAP00001",
"agreementId": "MQ==:MQ==:ZTY3MzQ4YWEtNTdmZC00YzA0LTg2ZmQtMGMxNzk0MWM3OTkw",
"transferProcessId": "78a66945-d638-4c0a-be71-b35a0318a410",
"assetId": "1",
"providerId": "BPNL00DATAP00001",
"tx:edrState": "NEGOTIATED",
"tx:expirationDate": 1690811364000,
"@context": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ void edrEntryExample() throws JsonProcessingException {
.isEqualTo(jsonObject.getString("tx:edrState"));

assertThat(first(content, EDR_ENTRY_ASSET_ID).getJsonString(VALUE).getString())
.isEqualTo(jsonObject.getString("edc:assetId"));
.isEqualTo(jsonObject.getString("assetId"));

assertThat(first(content, EDR_ENTRY_AGREEMENT_ID).getJsonString(VALUE).getString())
.isEqualTo(jsonObject.getString("edc:agreementId"));
.isEqualTo(jsonObject.getString("agreementId"));

assertThat(first(content, EDR_ENTRY_TRANSFER_PROCESS_ID).getJsonString(VALUE).getString())
.isEqualTo(jsonObject.getString("edc:transferProcessId"));
.isEqualTo(jsonObject.getString("transferProcessId"));

assertThat(first(content, EDR_ENTRY_PROVIDER_ID).getJsonString(VALUE).getString())
.isEqualTo(jsonObject.getString("edc:providerId"));
.isEqualTo(jsonObject.getString("providerId"));

assertThat(first(content, EDR_ENTRY_EXPIRATION_DATE).getJsonNumber(VALUE).longValue())
.isEqualTo(jsonObject.getJsonNumber("tx:expirationDate").longValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ void initEdrNegotiation_shouldReturnError_whenNotFound() {
@Test
void getEdr_shouldReturnDataAddress_whenFound() {
var transferProcessId = "id";
var edr = EndpointDataReference.Builder.newInstance().endpoint("test").id(transferProcessId).build();
var edr = EndpointDataReference.Builder.newInstance().endpoint("test")
.contractId("test-contract-id")
.id(transferProcessId).build();
var response = Json.createObjectBuilder()
.add(DataAddress.EDC_DATA_ADDRESS_TYPE_PROPERTY, EndpointDataReference.EDR_SIMPLE_TYPE)
.add(EndpointDataReference.ENDPOINT, edr.getEndpoint())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
--
-- Copyright (c) 2022 Mercedes-Benz Tech Innovation GmbH
--
-- This program and the accompanying materials are made available under the
-- terms of the Apache License, Version 2.0 which is available at
-- https://www.apache.org/licenses/LICENSE-2.0
--
-- SPDX-License-Identifier: Apache-2.0
--
-- Contributors:
-- Mercedes-Benz Tech Innovation GmbH - EDC Snapshot 20220815 Update
--

-- add columns
ALTER TABLE edc_asset
ADD COLUMN properties JSON;

ALTER TABLE edc_asset
ADD COLUMN private_properties JSON;

ALTER TABLE edc_asset
ADD COLUMN data_address JSON;

-- update data address, move all JSON data into the edc_asset table
UPDATE edc_asset
SET data_address = (SELECT properties FROM edc_asset_dataaddress WHERE asset_id_fk = a.asset_id)::json
FROM edc_asset as a
WHERE edc_asset.asset_id = a.asset_id;


-- update properties, move all JSON data from the edc_asset_properties table
UPDATE edc_asset
SET private_properties = (SELECT json_agg(json_build_object('property_name', prop.property_name, 'property_value',
prop.property_value, 'property_type', prop.property_type))
FROM edc_asset_property prop
WHERE asset_id_fk = a.asset_id
AND prop.property_is_private = true)
FROM edc_asset as a
WHERE edc_asset.asset_id = a.asset_id;

-- update private properties, move all JSON data from the edc_asset_properties table
UPDATE edc_asset
SET properties = (SELECT json_agg(json_build_object('property_name', prop.property_name, 'property_value',
prop.property_value, 'property_type', prop.property_type))
FROM edc_asset_property prop
WHERE asset_id_fk = a.asset_id
AND prop.property_is_private = false)
FROM edc_asset as a
WHERE edc_asset.asset_id = a.asset_id;


-- do NOT drop edc_asset_dataaddress and edc_asset_property to enable further data migration scripts


Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
--
-- Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
--
-- This program and the accompanying materials are made available under the
-- terms of the Apache License, Version 2.0 which is available at
-- https://www.apache.org/licenses/LICENSE-2.0
--
-- SPDX-License-Identifier: Apache-2.0
--
-- Contributors:
-- Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
--

-- add columns
ALTER TABLE edc_contract_definitions
ADD COLUMN private_properties JSON;
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class CatalogHelperFunctions {
public static JsonObject createCatalogRequest(JsonObject query, String dspEndpoint) {
var jsonBuilder = Json.createObjectBuilder();
jsonBuilder.add("@type", "CatalogRequest");
jsonBuilder.add(EDC_NAMESPACE + "providerUrl", dspEndpoint);
jsonBuilder.add(EDC_NAMESPACE + "counterPartyAddress", dspEndpoint);
jsonBuilder.add(EDC_NAMESPACE + "protocol", "dataspace-protocol-http");

if (query != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public class EdrNegotiationHelperFunctions {
public static JsonObject createEdrNegotiationRequest(String connectorAddress, String providerId, String offerId, String assetId, JsonObject policy, JsonArray callbacks) {
return Json.createObjectBuilder()
.add(TYPE, NegotiateEdrRequestDto.EDR_REQUEST_DTO_TYPE)
.add(EDC_NAMESPACE + "connectorId", providerId)
.add(EDC_NAMESPACE + "counterPartyId", providerId)
.add(EDC_NAMESPACE + "providerId", providerId)
.add(EDC_NAMESPACE + "connectorAddress", connectorAddress)
.add(EDC_NAMESPACE + "counterPartyAddress", connectorAddress)
.add(EDC_NAMESPACE + "protocol", DATASPACE_PROTOCOL_HTTP)
.add(EDC_NAMESPACE + "offer", Json.createObjectBuilder()
.add(EDC_NAMESPACE + "offerId", offerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public String getNegotiationState(String negotiationId) {
.get("/v2/contractnegotiations/{id}/state", negotiationId)
.then()
.statusCode(200)
.extract().body().jsonPath().getString("'edc:state'");
.extract().body().jsonPath().getString("'state'");
}

public String getContractAgreementId(String negotiationId) {
Expand Down Expand Up @@ -293,7 +293,7 @@ public String getTransferProcessState(String id) {
.get("/v2/transferprocesses/{id}/state", id)
.then()
.statusCode(200)
.extract().body().jsonPath().getString("'edc:state'");
.extract().body().jsonPath().getString("'state'");
}

public EndpointDataReference getDataReference(String dataRequestId) {
Expand Down Expand Up @@ -437,7 +437,7 @@ private String getContractNegotiationField(String negotiationId, String fieldNam
.then()
.statusCode(200)
.extract().body().jsonPath()
.getString(format("'edc:%s'", fieldName));
.getString(format("'%s'", fieldName));
}

private String getProxyData(Map<String, String> body) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void negotiateEdr_shouldRemoveExpiredEdrs() throws IOException {
var edrCaches = SOKRATES.getEdrEntriesByAssetId(assetId);
var localExpired = edrCaches.stream()
.filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name()))
.map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString())
.map(json -> json.asJsonObject().getJsonString("transferProcessId").getString())
.toList();
assertThat(localExpired).hasSizeGreaterThan(0);
expired.add(localExpired.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,21 @@ void negotiateEdr_shouldInvokeCallbacks() throws IOException {

assertThat(edrCaches).hasSize(1);

var transferProcessId = edrCaches.get(0).asJsonObject().getString("edc:transferProcessId");
var cnId = edrCaches.get(0).asJsonObject().getString("edc:contractNegotiationId");
var agreementId = edrCaches.get(0).asJsonObject().getString("edc:agreementId");
var transferProcessId = edrCaches.get(0).asJsonObject().getString("transferProcessId");
var cnId = edrCaches.get(0).asJsonObject().getString("contractNegotiationId");
var agreementId = edrCaches.get(0).asJsonObject().getString("agreementId");

assertThat(cnId).isEqualTo(contractNegotiationId);
assertThat(SOKRATES.getEdrEntriesByAgreementId(agreementId)).hasSize(1);


var edr = SOKRATES.getEdr(transferProcessId);

assertThat(edr.getJsonString("edc:type").getString()).isEqualTo(EDR_SIMPLE_TYPE);
assertThat(edr.getJsonString("edc:authCode").getString()).isNotNull();
assertThat(edr.getJsonString("edc:authKey").getString()).isNotNull();
assertThat(edr.getJsonString("edc:endpoint").getString()).isNotNull();
assertThat(edr.getJsonString("edc:id").getString()).isEqualTo(transferProcessId);
assertThat(edr.getJsonString("type").getString()).isEqualTo(EDR_SIMPLE_TYPE);
assertThat(edr.getJsonString("authCode").getString()).isNotNull();
assertThat(edr.getJsonString("authKey").getString()).isNotNull();
assertThat(edr.getJsonString("endpoint").getString()).isNotNull();
assertThat(edr.getJsonString("id").getString()).isEqualTo(transferProcessId);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void negotiateEdr_shouldRenewTheEdr() throws IOException {

var transferProcessId = edrCaches.stream()
.filter(json -> json.asJsonObject().getJsonString("tx:edrState").getString().equals(EXPIRED.name()))
.map(json -> json.asJsonObject().getJsonString("edc:transferProcessId").getString())
.map(json -> json.asJsonObject().getJsonString("transferProcessId").getString())
.findFirst()
.orElseThrow();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,14 @@ void verify_end2EndFlows() throws IOException {
var tpSpec = createSpecification(format(REQUEST_TEMPLATE_TP, SINGLE_TRANSFER_ID, PRODUCER_HTTP_PORT));

// verify content successfully proxied using a transfer process id
tpSpec.with()
var rs = tpSpec.with()
.post(PROXY_SUBPATH)
.then()
.log().ifError()
.assertThat().statusCode(200)
.assertThat().body(is(MOCK_ENDPOINT_200_BODY));
.body(is(MOCK_ENDPOINT_200_BODY));

var str = rs.extract().body().asString();

// verify content successfully proxied using an asset id for the case where only one active transfer process exists for the asset
var assetSpec = createSpecification(format(REQUEST_TEMPLATE_ASSET, SINGLE_ASSET_ID, PRODUCER_HTTP_PORT));
Expand Down
Loading

0 comments on commit ce9d077

Please sign in to comment.