Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: simplify negotiation and transfer handling by switching to the …
Browse files Browse the repository at this point in the history
…edr API.
drcgjung committed Dec 18, 2023
1 parent 3bba2bb commit 7f5bc56
Showing 40 changed files with 144 additions and 436 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -137,7 +137,7 @@ jobs:
type=semver,pattern={{version}}
type=semver,pattern={{major}}
type=semver,pattern={{major}}.{{minor}}
type=raw,value=1.10.15-SNAPSHOT,enable=${{ github.event.inputs.deploy_docker == 'true' || github.ref == format('refs/heads/{0}', 'main') }}
type=raw,value=1.11.16-SNAPSHOT,enable=${{ github.event.inputs.deploy_docker == 'true' || github.ref == format('refs/heads/{0}', 'main') }}
type=raw,value=latest,enable=${{ github.ref == format('refs/heads/{0}', 'main') }}
- name: Agent Plane Hashicorp Container Build and push
@@ -175,7 +175,7 @@ jobs:
type=semver,pattern={{version}}
type=semver,pattern={{major}}
type=semver,pattern={{major}}.{{minor}}
type=raw,value=1.10.15-SNAPSHOT,enable=${{ github.event.inputs.deploy_docker == 'true' || github.ref == format('refs/heads/{0}', 'main') }}
type=raw,value=1.11.16-SNAPSHOT,enable=${{ github.event.inputs.deploy_docker == 'true' || github.ref == format('refs/heads/{0}', 'main') }}
type=raw,value=latest,enable=${{ github.ref == format('refs/heads/{0}', 'main') }}
- name: Agent Plane Azure Vault Container Build and push
4 changes: 2 additions & 2 deletions agent-plane/README.md
Original file line number Diff line number Diff line change
@@ -66,10 +66,10 @@ mvn package -Pwith-docker-image
Alternatively, after a successful build, you can invoke docker yourself

```console
docker build -t tractusx/agentplane-azure-vault:1.10.15-SNAPSHOT -f agentplane-azure-vault/src/main/docker/Dockerfile .
docker build -t tractusx/agentplane-azure-vault:1.11.16-SNAPSHOT -f agentplane-azure-vault/src/main/docker/Dockerfile .
```

```console
docker build -t tractusx/agentplane-hashicorp:1.10.15-SNAPSHOT -f agentplane-hashicorp/src/main/docker/Dockerfile .
docker build -t tractusx/agentplane-hashicorp:1.11.16-SNAPSHOT -f agentplane-hashicorp/src/main/docker/Dockerfile .
```

2 changes: 1 addition & 1 deletion agent-plane/agent-plane-protocol/README.md
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ Add the following dependency to your data-plane artifact pom:
<dependency>
<groupId>org.eclipse.tractusx.agents.edc</groupId>
<artifactId>agent-plane-protocol</artifactId>
<version>1.10.15-SNAPSHOT</version>
<version>1.11.16-SNAPSHOT</version>
</dependency>
```

2 changes: 1 addition & 1 deletion agent-plane/agent-plane-protocol/pom.xml
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@
<parent>
<groupId>org.eclipse.tractusx.agents.edc</groupId>
<artifactId>agent-plane</artifactId>
<version>1.10.15-SNAPSHOT</version>
<version>1.11.16-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Original file line number Diff line number Diff line change
@@ -31,8 +31,6 @@ web.http.default.port=8082
web.http.default.path=/api
web.http.public.port=8185
web.http.public.path=/api/v1/public
web.http.callback.port=8187
web.http.callback.path=/callback
web.http.control.port=9999
web.http.control.path=/api/dataplane/control
edc.web.rest.cors.enabled=true
@@ -63,7 +61,6 @@ edc.dataplane.token.validation.endpoints.consumer=http://consuming-control-plane
cx.agent.controlplane.protocol=http://consuming-control-plane:8282
cx.agent.controlplane.management=http://consuming-control-plane:8181/management
cx.agent.controlplane.management.provider=http://providing-control-plane:8181/management
cx.agent.callback=http://agent-plane:8187/callback/endpoint-data-reference
cx.agent.skill.contract.default=Contract?partner=Skill
cx.agent.edc.version=0.5.1

Original file line number Diff line number Diff line change
@@ -72,8 +72,6 @@ public class AgentConfig {
public static String READ_TIMEOUT_PROPERTY = "cx.agent.read.timeout";
public static int DEFAULT_READ_TIMEOUT=1080000;

public static String CALLBACK_ENDPOINT="cx.agent.callback";

public static String DEFAULT_SKILL_CONTRACT_PROPERTY = "cx.agent.skill.contract.default";

public static String SERVICE_ALLOW_PROPERTY = "cx.agent.service.allow";
@@ -118,13 +116,6 @@ public AgentConfig(Monitor monitor, Config config) {
serviceAssetDenyPattern=Pattern.compile(config.getString(SERVICE_DENY_ASSET_PROPERTY,DEFAULT_SERVICE_DENY_ASSET_PATTERN));
}

/**
* @return callback endpoint
*/
public String getCallbackEndpoint() {
return config.getString(CALLBACK_ENDPOINT);
}

/**
* @return the name of the default asset/graph
*/
Original file line number Diff line number Diff line change
@@ -62,7 +62,6 @@ public class AgentExtension implements ServiceExtension {
* static constants
*/
protected static final String DEFAULT_CONTEXT_ALIAS = "default";
protected static final String CALLBACK_CONTEXT_ALIAS = "callback";
public static Pattern GRAPH_PATTERN=Pattern.compile("((?<url>[^#]+)#)?(?<graph>.*Graph(Asset)?.*)");
public static Pattern SKILL_PATTERN=Pattern.compile("((?<url>[^#]+)#)?(?<skill>.*Skill(Asset)?.*)");

@@ -126,8 +125,6 @@ public void initialize(ServiceExtensionContext context) {
DataManagement catalogService=new DataManagement(monitor,typeManager,httpClient,config);

AgreementController agreementController=new AgreementController(monitor,config,catalogService);
monitor.debug(String.format("Registering agreement controller %s",agreementController));
webService.registerResource(CALLBACK_CONTEXT_ALIAS, agreementController);

RDFStore rdfStore=new RDFStore(config,monitor);

Original file line number Diff line number Diff line change
@@ -19,11 +19,8 @@
import com.nimbusds.jose.JWSObject;
import jakarta.json.JsonValue;
import jakarta.ws.rs.*;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.agents.edc.jsonld.JsonLd;
import org.eclipse.tractusx.agents.edc.model.*;
@@ -37,8 +34,6 @@
/**
* An endpoint/service that receives information from the control plane
*/
@Consumes({MediaType.APPLICATION_JSON})
@Path("/endpoint-data-reference")
public class AgreementController implements IAgreementController {

/**
@@ -61,13 +56,7 @@ public class AgreementController implements IAgreementController {
// hosts all pending processes
protected final Set<String> activeAssets = new HashSet<>();
// any contract agreements indexed by asset
protected final Map<String, ContractAgreement> agreementStore = new HashMap<>();
// any transfer processes indexed by asset, the current process should
// always adhere to the above agreement
protected final Map<String, TransferProcess> processStore = new HashMap<>();
// at the end of provisioning and endpoint reference will be set
// that fits to the current transfer process
protected final Map<String, EndpointDataReference> endpointStore = new HashMap<>();
protected final Map<String, EndpointDataReference> agreementStore = new HashMap<>();

/**
* creates an agreement controller
@@ -90,29 +79,6 @@ public String toString() {
return super.toString() + "/endpoint-data-reference";
}

/**
* this is called by the control plane when an agreement has been made
*
* @param dataReference contains the actual call token
*/
@POST
public void receiveEdcCallback(EndpointDataReference dataReference) {
var agreementId = dataReference.getId();
monitor.debug(String.format("An endpoint data reference for agreement %s has been posted.", agreementId));
synchronized (agreementStore) {
for (Map.Entry<String, TransferProcess> process : processStore.entrySet()) {
if (process.getValue().getId().equals(agreementId)) {
synchronized (endpointStore) {
monitor.debug(String.format("Agreement %s belongs to asset %s.", agreementId, process.getKey()));
endpointStore.put(process.getKey(), dataReference);
return;
}
}
}
}
monitor.debug(String.format("Agreement %s has no active asset. Guess that came for another plane. Ignoring.", agreementId));
}

/**
* accesses an active endpoint for the given asset
*
@@ -126,8 +92,8 @@ public EndpointDataReference get(String assetId) {
monitor.debug(String.format("Asset %s is not active", assetId));
return null;
}
synchronized (endpointStore) {
EndpointDataReference result = endpointStore.get(assetId);
synchronized (agreementStore) {
EndpointDataReference result = agreementStore.get(assetId);
if (result != null) {
String token = result.getAuthCode();
if (token != null) {
@@ -136,27 +102,18 @@ public EndpointDataReference get(String assetId) {
Object expiryObject=jwt.getPayload().toJSONObject().get("exp");
if(expiryObject instanceof Long) {
// token times are in seconds
if(!new Date((Long) expiryObject*1000).before(new Date(System.currentTimeMillis() + 30 * 1000))) {
if (!new Date((Long) expiryObject*1000).before(new Date(System.currentTimeMillis() + 30 * 1000))) {
return result;
}
}
} catch(ParseException | NumberFormatException e) {
} catch (ParseException | NumberFormatException e) {
monitor.debug(String.format("Active asset %s has invalid agreement token.", assetId));
}
}
endpointStore.remove(assetId);
agreementStore.remove(assetId);
}
monitor.debug(String.format("Active asset %s has timed out or was not installed.", assetId));
synchronized (processStore) {
processStore.remove(assetId);
synchronized (agreementStore) {
ContractAgreement agreement = agreementStore.get(assetId);
if (agreement != null && agreement.getContractSigningDate()+600000L <= System.currentTimeMillis()) {
agreementStore.remove(assetId);
}
activeAssets.remove(assetId);
}
}
activeAssets.remove(assetId);
}
}
return null;
@@ -186,30 +143,27 @@ protected void deactivate(String asset) {
synchronized (agreementStore) {
agreementStore.remove(asset);
}
synchronized (processStore) {
processStore.remove(asset);
}
}

/**
* register an agreement
*
* @param asset name
* @param agreement object
*/
protected void registerAgreement(String asset, ContractAgreement agreement) {
protected EndpointDataReference registerAgreement(String asset, ContractAgreement agreement, Map<String, JsonValue> assetProperties) {
synchronized (agreementStore) {
agreementStore.put(asset, agreement);
}
}

/**
* register a process
* @param asset name
* @param process object
*/
protected void registerProcess(String asset, TransferProcess process) {
synchronized (processStore) {
processStore.put(asset, process);
var edrBuilder = EndpointDataReference.Builder.newInstance();
edrBuilder.authCode(agreement.getAuthCode());
edrBuilder.authKey(agreement.getAuthKey());
edrBuilder.endpoint(agreement.getEndpoint());
edrBuilder.id(agreement.getCId());
var edr = edrBuilder.build();
for (Map.Entry<String,JsonValue> prop : assetProperties.entrySet()) {
edr.getProperties().put(prop.getKey(), JsonLd.asString(prop.getValue()));
}
agreementStore.put(asset, edr);
return edr;
}
}

@@ -282,10 +236,10 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr
try {
while ((System.currentTimeMillis() - startTime < config.getNegotiationTimeout())
&& (negotiation == null ||
(!negotiation.getState().equals("FINALIZED") && !negotiation.getState().equals("TERMINATED")))) {
(!negotiation.getState().equals("NEGOTIATED") && !negotiation.getState().equals("TERMINATED")))) {
Thread.sleep(config.getNegotiationPollInterval());
negotiation = dataManagement.getNegotiation(
negotiationId
asset
);
}
} catch (InterruptedException e) {
@@ -294,7 +248,7 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr
monitor.warning(String.format("Negotiation thread for asset %s negotiation %s run into problem. Giving up.", asset, negotiationId),e);
}

if (negotiation == null || !negotiation.getState().equals("FINALIZED")) {
if (negotiation == null || !negotiation.getState().equals("NEGOTIATED")) {
deactivate(asset);
if(negotiation!=null) {
String errorDetail=negotiation.getErrorDetail();
@@ -305,105 +259,23 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr
throw new InternalServerErrorException(String.format("Contract Negotiation %s for asset %s was not successful.", negotiationId, asset));
}

monitor.debug(String.format("About to check agreement %s for contract offer %s (for asset %s at connector %s)",negotiation.getContractAgreementId(),offerId,asset,remoteUrl));
monitor.debug(String.format("About to check edr %s for contract offer %s (for asset %s at connector %s)",negotiation.getContractAgreementId(),offerId,asset,remoteUrl));

ContractAgreement agreement;

try {
agreement=dataManagement.getAgreement(negotiation.getContractAgreementId());
agreement=dataManagement.getEdr(negotiation.getTransferProcessId());
} catch(IOException ioe) {
deactivate(asset);
throw new InternalServerErrorException(String.format("Error when retrieving agreement %s for negotiation %s.",negotiation.getContractAgreementId(),negotiationId),ioe);
}

if (agreement == null || !agreement.getAssetId().endsWith(asset)) {
if (agreement == null) {
deactivate(asset);
throw new InternalServerErrorException(String.format("Agreement %s does not refer to asset %s.", negotiation.getContractAgreementId(), asset));
}

registerAgreement(asset,agreement);

DataAddress dataDestination = DataAddress.Builder.newInstance()
.type(TRANSFER_TYPE)
.build();

CallbackAddress address=
CallbackAddress.Builder.newInstance().uri(config.getCallbackEndpoint()).build();

TransferRequest transferRequest = TransferRequest.Builder.newInstance()
.assetId(asset)
.contractId(agreement.getId())
.connectorId(config.getBusinessPartnerNumber())
.connectorAddress(String.format(DataManagement.DSP_PATH, remoteUrl))
.protocol("dataspace-protocol-http")
.dataDestination(dataDestination)
.managedResources(false)
.callbackAddresses(List.of(address))
.build();

monitor.debug(String.format("About to initiate transfer for agreement %s (for asset %s at connector %s)",negotiation.getContractAgreementId(),asset,remoteUrl));

String transferId;

try {
transferId=dataManagement.initiateHttpProxyTransferProcess(transferRequest);
} catch(IOException ioe) {
deactivate(asset);
throw new InternalServerErrorException(String.format("HttpProxy transfer for agreement %s could not be initiated.", agreement.getId()),ioe);
}

monitor.debug(String.format("About to check transfer %s (for asset %s at connector %s)",transferId,asset,remoteUrl));

// Check negotiation state
TransferProcess process = null;

startTime = System.currentTimeMillis();

// EDC 0.5.1 has a problem with the checker configuration and wont process to COMPLETED
String expectedTransferState = config.isPrerelease() ? "COMPLETED" : "STARTED";

try {
while ((System.currentTimeMillis() - startTime < config.getNegotiationTimeout()) && (process == null || !process.getState().equals(expectedTransferState))) {
Thread.sleep(config.getNegotiationPollInterval());
process = dataManagement.getTransfer(
transferId
);
registerProcess(asset, process);
}
} catch (InterruptedException e) {
monitor.info(String.format("Process thread for asset %s transfer %s has been interrupted. Giving up.", asset, transferId),e);
} catch(IOException e) {
monitor.warning(String.format("Process thread for asset %s transfer %s run into problem. Giving up.", asset, transferId),e);
}

if (process == null || !process.getState().equals(expectedTransferState)) {
deactivate(asset);
throw new InternalServerErrorException(String.format("Transfer process %s for agreement %s and asset %s could not be provisioned.", transferId, agreement.getId(), asset));
}

// finally wait a bit for the endpoint data reference in case
// that the process was signalled earlier than the callbacks
startTime = System.currentTimeMillis();

EndpointDataReference reference=null;

try {
while ((System.currentTimeMillis() - startTime < config.getNegotiationTimeout()) && (reference == null)) {
Thread.sleep(config.getNegotiationPollInterval());
synchronized(endpointStore) {
reference=endpointStore.get(asset);
}
}
} catch (InterruptedException e) {
monitor.info(String.format("Wait thread for reference to asset %s has been interrupted. Giving up.", asset),e);
}

// mark the type in the endpoint
if(reference!=null) {
for(Map.Entry<String,JsonValue> prop : assetProperties.entrySet()) {
reference.getProperties().put(prop.getKey(), JsonLd.asString(prop.getValue()));
}
}
registerAgreement(asset,agreement, assetProperties);

// now delegate to the original getter
return get(asset);
Loading

0 comments on commit 7f5bc56

Please sign in to comment.